mirror of
https://github.com/pacnpal/thrillwiki_django_no_react.git
synced 2025-12-20 08:51:09 -05:00
258 lines
9.0 KiB
Python
258 lines
9.0 KiB
Python
"""
|
|
Social Provider Management Service
|
|
|
|
This service handles the business logic for connecting and disconnecting
|
|
social authentication providers while ensuring users never lock themselves
|
|
out of their accounts.
|
|
"""
|
|
|
|
from typing import Dict, List, Tuple, TYPE_CHECKING
|
|
from django.contrib.auth import get_user_model
|
|
from allauth.socialaccount.models import SocialApp
|
|
from allauth.socialaccount.providers import registry
|
|
from django.contrib.sites.shortcuts import get_current_site
|
|
from django.http import HttpRequest
|
|
import logging
|
|
|
|
if TYPE_CHECKING:
|
|
from apps.accounts.models import User
|
|
else:
|
|
User = get_user_model()
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SocialProviderService:
|
|
"""Service for managing social provider connections."""
|
|
|
|
@staticmethod
|
|
def can_disconnect_provider(user: User, provider: str) -> Tuple[bool, str]:
|
|
"""
|
|
Check if a user can safely disconnect a social provider.
|
|
|
|
Args:
|
|
user: The user attempting to disconnect
|
|
provider: The provider to disconnect (e.g., 'google', 'discord')
|
|
|
|
Returns:
|
|
Tuple of (can_disconnect: bool, reason: str)
|
|
"""
|
|
try:
|
|
# Count remaining social accounts after disconnection
|
|
remaining_social_accounts = user.socialaccount_set.exclude(
|
|
provider=provider
|
|
).count()
|
|
|
|
# Check if user has email/password auth
|
|
has_password_auth = (
|
|
user.email and
|
|
user.has_usable_password() and
|
|
bool(user.password) # Not empty/unusable
|
|
)
|
|
|
|
# Allow disconnection only if alternative auth exists
|
|
can_disconnect = remaining_social_accounts > 0 or has_password_auth
|
|
|
|
if not can_disconnect:
|
|
if remaining_social_accounts == 0 and not has_password_auth:
|
|
return False, "Cannot disconnect your only authentication method. Please set up a password or connect another social provider first."
|
|
elif not has_password_auth:
|
|
return False, "Please set up email/password authentication before disconnecting this provider."
|
|
else:
|
|
return False, "Cannot disconnect this provider at this time."
|
|
|
|
return True, "Provider can be safely disconnected."
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error checking disconnect permission for user {user.id}, provider {provider}: {e}")
|
|
return False, "Unable to verify disconnection safety. Please try again."
|
|
|
|
@staticmethod
|
|
def get_connected_providers(user: "User") -> List[Dict]:
|
|
"""
|
|
Get all social providers connected to a user's account.
|
|
|
|
Args:
|
|
user: The user to check
|
|
|
|
Returns:
|
|
List of connected provider information
|
|
"""
|
|
try:
|
|
connected_providers = []
|
|
|
|
for social_account in user.socialaccount_set.all():
|
|
can_disconnect, reason = SocialProviderService.can_disconnect_provider(
|
|
user, social_account.provider
|
|
)
|
|
|
|
provider_info = {
|
|
'provider': social_account.provider,
|
|
'provider_name': social_account.get_provider().name,
|
|
'uid': social_account.uid,
|
|
'date_joined': social_account.date_joined,
|
|
'can_disconnect': can_disconnect,
|
|
'disconnect_reason': reason if not can_disconnect else None,
|
|
'extra_data': social_account.extra_data
|
|
}
|
|
|
|
connected_providers.append(provider_info)
|
|
|
|
return connected_providers
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting connected providers for user {user.id}: {e}")
|
|
return []
|
|
|
|
@staticmethod
|
|
def get_available_providers(request: HttpRequest) -> List[Dict]:
|
|
"""
|
|
Get all available social providers for the current site.
|
|
|
|
Args:
|
|
request: The HTTP request
|
|
|
|
Returns:
|
|
List of available provider information
|
|
"""
|
|
try:
|
|
site = get_current_site(request)
|
|
available_providers = []
|
|
|
|
# Get all social apps configured for this site
|
|
social_apps = SocialApp.objects.filter(sites=site).order_by('provider')
|
|
|
|
for social_app in social_apps:
|
|
try:
|
|
provider = registry.by_id(social_app.provider)
|
|
|
|
provider_info = {
|
|
'id': social_app.provider,
|
|
'name': provider.name,
|
|
'auth_url': request.build_absolute_uri(
|
|
f'/accounts/{social_app.provider}/login/'
|
|
),
|
|
'connect_url': request.build_absolute_uri(
|
|
f'/api/v1/auth/social/connect/{social_app.provider}/'
|
|
)
|
|
}
|
|
|
|
available_providers.append(provider_info)
|
|
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Error processing provider {social_app.provider}: {e}")
|
|
continue
|
|
|
|
return available_providers
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting available providers: {e}")
|
|
return []
|
|
|
|
@staticmethod
|
|
def disconnect_provider(user: "User", provider: str) -> Tuple[bool, str]:
|
|
"""
|
|
Disconnect a social provider from a user's account.
|
|
|
|
Args:
|
|
user: The user to disconnect from
|
|
provider: The provider to disconnect
|
|
|
|
Returns:
|
|
Tuple of (success: bool, message: str)
|
|
"""
|
|
try:
|
|
# First check if disconnection is allowed
|
|
can_disconnect, reason = SocialProviderService.can_disconnect_provider(
|
|
user, provider)
|
|
|
|
if not can_disconnect:
|
|
return False, reason
|
|
|
|
# Find and delete the social account
|
|
social_accounts = user.socialaccount_set.filter(provider=provider)
|
|
|
|
if not social_accounts.exists():
|
|
return False, f"No {provider} account found to disconnect."
|
|
|
|
# Delete all social accounts for this provider (in case of duplicates)
|
|
deleted_count = social_accounts.count()
|
|
social_accounts.delete()
|
|
|
|
logger.info(
|
|
f"User {user.id} disconnected {deleted_count} {provider} account(s)")
|
|
|
|
return True, f"{provider.title()} account disconnected successfully."
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error disconnecting {provider} for user {user.id}: {e}")
|
|
return False, f"Failed to disconnect {provider} account. Please try again."
|
|
|
|
@staticmethod
|
|
def get_auth_status(user: "User") -> Dict:
|
|
"""
|
|
Get comprehensive authentication status for a user.
|
|
|
|
Args:
|
|
user: The user to check
|
|
|
|
Returns:
|
|
Dictionary with authentication status information
|
|
"""
|
|
try:
|
|
connected_providers = SocialProviderService.get_connected_providers(user)
|
|
|
|
has_password_auth = (
|
|
user.email and
|
|
user.has_usable_password() and
|
|
bool(user.password)
|
|
)
|
|
|
|
auth_methods_count = len(connected_providers) + \
|
|
(1 if has_password_auth else 0)
|
|
|
|
return {
|
|
'user_id': user.id,
|
|
'username': user.username,
|
|
'email': user.email,
|
|
'has_password_auth': has_password_auth,
|
|
'connected_providers': connected_providers,
|
|
'total_auth_methods': auth_methods_count,
|
|
'can_disconnect_any': auth_methods_count > 1,
|
|
'requires_password_setup': not has_password_auth and len(connected_providers) == 1
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting auth status for user {user.id}: {e}")
|
|
return {
|
|
'error': 'Unable to retrieve authentication status'
|
|
}
|
|
|
|
@staticmethod
|
|
def validate_provider_exists(provider: str) -> Tuple[bool, str]:
|
|
"""
|
|
Validate that a social provider is configured and available.
|
|
|
|
Args:
|
|
provider: The provider ID to validate
|
|
|
|
Returns:
|
|
Tuple of (is_valid: bool, message: str)
|
|
"""
|
|
try:
|
|
# Check if provider is registered with allauth
|
|
if provider not in registry.provider_map:
|
|
return False, f"Provider '{provider}' is not supported."
|
|
|
|
# Check if provider has a social app configured
|
|
if not SocialApp.objects.filter(provider=provider).exists():
|
|
return False, f"Provider '{provider}' is not configured on this site."
|
|
|
|
return True, f"Provider '{provider}' is valid and available."
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error validating provider {provider}: {e}")
|
|
return False, "Unable to validate provider."
|