mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2025-12-20 17:31:09 -05:00
270 lines
8.3 KiB
Python
270 lines
8.3 KiB
Python
from rest_framework import serializers
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.auth.password_validation import validate_password
|
|
from django.utils.crypto import get_random_string
|
|
from django.utils import timezone
|
|
from datetime import timedelta
|
|
from django.contrib.sites.shortcuts import get_current_site
|
|
from .models import User, PasswordReset
|
|
from django_forwardemail.services import EmailService
|
|
from django.template.loader import render_to_string
|
|
from typing import cast
|
|
|
|
UserModel = get_user_model()
|
|
|
|
|
|
class UserSerializer(serializers.ModelSerializer):
|
|
"""
|
|
User serializer for API responses
|
|
"""
|
|
|
|
avatar_url = serializers.SerializerMethodField()
|
|
display_name = serializers.SerializerMethodField()
|
|
|
|
class Meta:
|
|
model = User
|
|
fields = [
|
|
"id",
|
|
"username",
|
|
"email",
|
|
"display_name",
|
|
"date_joined",
|
|
"is_active",
|
|
"avatar_url",
|
|
]
|
|
read_only_fields = ["id", "date_joined", "is_active"]
|
|
|
|
def get_avatar_url(self, obj) -> str | None:
|
|
"""Get user avatar URL"""
|
|
if hasattr(obj, "profile") and obj.profile.avatar:
|
|
return obj.profile.avatar.url
|
|
return None
|
|
|
|
def get_display_name(self, obj) -> str:
|
|
"""Get user display name"""
|
|
return obj.get_display_name()
|
|
|
|
|
|
class LoginSerializer(serializers.Serializer):
|
|
"""
|
|
Serializer for user login
|
|
"""
|
|
|
|
username = serializers.CharField(
|
|
max_length=254, help_text="Username or email address"
|
|
)
|
|
password = serializers.CharField(
|
|
max_length=128, style={"input_type": "password"}, trim_whitespace=False
|
|
)
|
|
|
|
def validate(self, attrs):
|
|
username = attrs.get("username")
|
|
password = attrs.get("password")
|
|
|
|
if username and password:
|
|
return attrs
|
|
|
|
raise serializers.ValidationError("Must include username/email and password.")
|
|
|
|
|
|
class SignupSerializer(serializers.ModelSerializer):
|
|
"""
|
|
Serializer for user registration
|
|
"""
|
|
|
|
password = serializers.CharField(
|
|
write_only=True,
|
|
validators=[validate_password],
|
|
style={"input_type": "password"},
|
|
)
|
|
password_confirm = serializers.CharField(
|
|
write_only=True, style={"input_type": "password"}
|
|
)
|
|
|
|
class Meta:
|
|
model = User
|
|
fields = [
|
|
"username",
|
|
"email",
|
|
"display_name",
|
|
"password",
|
|
"password_confirm",
|
|
]
|
|
extra_kwargs = {
|
|
"password": {"write_only": True},
|
|
"email": {"required": True},
|
|
"display_name": {"required": True},
|
|
}
|
|
|
|
def validate_email(self, value):
|
|
"""Validate email is unique (normalize and check case-insensitively)."""
|
|
normalized = value.strip().lower() if value is not None else value
|
|
if UserModel.objects.filter(email__iexact=normalized).exists():
|
|
raise serializers.ValidationError("A user with this email already exists.")
|
|
return normalized
|
|
|
|
def validate_username(self, value):
|
|
"""Validate username is unique"""
|
|
if UserModel.objects.filter(username=value).exists():
|
|
raise serializers.ValidationError(
|
|
"A user with this username already exists."
|
|
)
|
|
return value
|
|
|
|
def validate(self, attrs):
|
|
"""Validate passwords match"""
|
|
password = attrs.get("password")
|
|
password_confirm = attrs.get("password_confirm")
|
|
|
|
if password != password_confirm:
|
|
raise serializers.ValidationError(
|
|
{"password_confirm": "Passwords do not match."}
|
|
)
|
|
|
|
return attrs
|
|
|
|
def create(self, validated_data):
|
|
"""Create user with validated data"""
|
|
validated_data.pop("password_confirm", None)
|
|
password = validated_data.pop("password")
|
|
|
|
user = UserModel.objects.create(**validated_data)
|
|
user.set_password(password)
|
|
user.save()
|
|
|
|
return user
|
|
|
|
|
|
class PasswordResetSerializer(serializers.Serializer):
|
|
"""
|
|
Serializer for password reset request
|
|
"""
|
|
|
|
email = serializers.EmailField()
|
|
|
|
def validate_email(self, value):
|
|
"""Normalize email and attach the user to the serializer when found (case-insensitive).
|
|
|
|
Returns the normalized email. Does not reveal whether the email exists.
|
|
"""
|
|
normalized = value.strip().lower() if value is not None else value
|
|
try:
|
|
user = UserModel.objects.get(email__iexact=normalized)
|
|
self.user = user
|
|
except UserModel.DoesNotExist:
|
|
# Do not reveal whether the email exists; keep behavior unchanged.
|
|
pass
|
|
return normalized
|
|
|
|
def save(self, **kwargs):
|
|
"""Send password reset email if user exists"""
|
|
if hasattr(self, "user"):
|
|
# Create password reset token
|
|
token = get_random_string(64)
|
|
PasswordReset.objects.update_or_create(
|
|
user=self.user,
|
|
defaults={
|
|
"token": token,
|
|
"expires_at": timezone.now() + timedelta(hours=24),
|
|
"used": False,
|
|
},
|
|
)
|
|
|
|
# Send reset email
|
|
request = self.context.get("request")
|
|
if request:
|
|
site = get_current_site(request)
|
|
reset_url = f"{request.scheme}://{site.domain}/reset-password/{token}/"
|
|
|
|
context = {
|
|
"user": self.user,
|
|
"reset_url": reset_url,
|
|
"site_name": site.name,
|
|
}
|
|
|
|
email_html = render_to_string(
|
|
"accounts/email/password_reset.html", context
|
|
)
|
|
|
|
# Narrow and validate email type for the static checker
|
|
email = getattr(self.user, "email", None)
|
|
if not email:
|
|
# No recipient email; skip sending
|
|
return
|
|
|
|
EmailService.send_email(
|
|
to=cast(str, email),
|
|
subject="Reset your password",
|
|
text=f"Click the link to reset your password: {reset_url}",
|
|
site=site,
|
|
html=email_html,
|
|
)
|
|
|
|
|
|
class PasswordChangeSerializer(serializers.Serializer):
|
|
"""
|
|
Serializer for password change
|
|
"""
|
|
|
|
old_password = serializers.CharField(
|
|
max_length=128, style={"input_type": "password"}
|
|
)
|
|
new_password = serializers.CharField(
|
|
max_length=128, validators=[validate_password], style={"input_type": "password"}
|
|
)
|
|
new_password_confirm = serializers.CharField(
|
|
max_length=128, style={"input_type": "password"}
|
|
)
|
|
|
|
def validate_old_password(self, value):
|
|
"""Validate old password is correct"""
|
|
user = self.context["request"].user
|
|
if not user.check_password(value):
|
|
raise serializers.ValidationError("Old password is incorrect.")
|
|
return value
|
|
|
|
def validate(self, attrs):
|
|
"""Validate new passwords match"""
|
|
new_password = attrs.get("new_password")
|
|
new_password_confirm = attrs.get("new_password_confirm")
|
|
|
|
if new_password != new_password_confirm:
|
|
raise serializers.ValidationError(
|
|
{"new_password_confirm": "New passwords do not match."}
|
|
)
|
|
|
|
return attrs
|
|
|
|
def save(self, **kwargs):
|
|
"""Change user password"""
|
|
user = self.context["request"].user
|
|
|
|
# Defensively obtain new_password from validated_data if it's a real dict,
|
|
# otherwise fall back to initial_data if that's a dict.
|
|
new_password = None
|
|
validated = getattr(self, "validated_data", None)
|
|
if isinstance(validated, dict):
|
|
new_password = validated.get("new_password")
|
|
elif isinstance(self.initial_data, dict):
|
|
new_password = self.initial_data.get("new_password")
|
|
|
|
if not new_password:
|
|
raise serializers.ValidationError("New password is required.")
|
|
|
|
user.set_password(new_password)
|
|
user.save()
|
|
|
|
return user
|
|
|
|
|
|
class SocialProviderSerializer(serializers.Serializer):
|
|
"""
|
|
Serializer for social authentication providers
|
|
"""
|
|
|
|
id = serializers.CharField()
|
|
name = serializers.CharField()
|
|
login_url = serializers.URLField()
|
|
name = serializers.CharField()
|
|
login_url = serializers.URLField()
|