Files
thrillwiki_django_no_react/apps/accounts/services/social_provider_service.py
2025-09-21 20:19:12 -04:00

258 lines
9.0 KiB
Python

"""
Social Provider Management Service
This service handles the business logic for connecting and disconnecting
social authentication providers while ensuring users never lock themselves
out of their accounts.
"""
from typing import Dict, List, Tuple, TYPE_CHECKING
from django.contrib.auth import get_user_model
from allauth.socialaccount.models import SocialApp
from allauth.socialaccount.providers import registry
from django.contrib.sites.shortcuts import get_current_site
from django.http import HttpRequest
import logging
if TYPE_CHECKING:
from apps.accounts.models import User
else:
User = get_user_model()
logger = logging.getLogger(__name__)
class SocialProviderService:
"""Service for managing social provider connections."""
@staticmethod
def can_disconnect_provider(user: User, provider: str) -> Tuple[bool, str]:
"""
Check if a user can safely disconnect a social provider.
Args:
user: The user attempting to disconnect
provider: The provider to disconnect (e.g., 'google', 'discord')
Returns:
Tuple of (can_disconnect: bool, reason: str)
"""
try:
# Count remaining social accounts after disconnection
remaining_social_accounts = user.socialaccount_set.exclude(
provider=provider
).count()
# Check if user has email/password auth
has_password_auth = (
user.email and
user.has_usable_password() and
bool(user.password) # Not empty/unusable
)
# Allow disconnection only if alternative auth exists
can_disconnect = remaining_social_accounts > 0 or has_password_auth
if not can_disconnect:
if remaining_social_accounts == 0 and not has_password_auth:
return False, "Cannot disconnect your only authentication method. Please set up a password or connect another social provider first."
elif not has_password_auth:
return False, "Please set up email/password authentication before disconnecting this provider."
else:
return False, "Cannot disconnect this provider at this time."
return True, "Provider can be safely disconnected."
except Exception as e:
logger.error(
f"Error checking disconnect permission for user {user.id}, provider {provider}: {e}")
return False, "Unable to verify disconnection safety. Please try again."
@staticmethod
def get_connected_providers(user: "User") -> List[Dict]:
"""
Get all social providers connected to a user's account.
Args:
user: The user to check
Returns:
List of connected provider information
"""
try:
connected_providers = []
for social_account in user.socialaccount_set.all():
can_disconnect, reason = SocialProviderService.can_disconnect_provider(
user, social_account.provider
)
provider_info = {
'provider': social_account.provider,
'provider_name': social_account.get_provider().name,
'uid': social_account.uid,
'date_joined': social_account.date_joined,
'can_disconnect': can_disconnect,
'disconnect_reason': reason if not can_disconnect else None,
'extra_data': social_account.extra_data
}
connected_providers.append(provider_info)
return connected_providers
except Exception as e:
logger.error(f"Error getting connected providers for user {user.id}: {e}")
return []
@staticmethod
def get_available_providers(request: HttpRequest) -> List[Dict]:
"""
Get all available social providers for the current site.
Args:
request: The HTTP request
Returns:
List of available provider information
"""
try:
site = get_current_site(request)
available_providers = []
# Get all social apps configured for this site
social_apps = SocialApp.objects.filter(sites=site).order_by('provider')
for social_app in social_apps:
try:
provider = registry.by_id(social_app.provider)
provider_info = {
'id': social_app.provider,
'name': provider.name,
'auth_url': request.build_absolute_uri(
f'/accounts/{social_app.provider}/login/'
),
'connect_url': request.build_absolute_uri(
f'/api/v1/auth/social/connect/{social_app.provider}/'
)
}
available_providers.append(provider_info)
except Exception as e:
logger.warning(
f"Error processing provider {social_app.provider}: {e}")
continue
return available_providers
except Exception as e:
logger.error(f"Error getting available providers: {e}")
return []
@staticmethod
def disconnect_provider(user: "User", provider: str) -> Tuple[bool, str]:
"""
Disconnect a social provider from a user's account.
Args:
user: The user to disconnect from
provider: The provider to disconnect
Returns:
Tuple of (success: bool, message: str)
"""
try:
# First check if disconnection is allowed
can_disconnect, reason = SocialProviderService.can_disconnect_provider(
user, provider)
if not can_disconnect:
return False, reason
# Find and delete the social account
social_accounts = user.socialaccount_set.filter(provider=provider)
if not social_accounts.exists():
return False, f"No {provider} account found to disconnect."
# Delete all social accounts for this provider (in case of duplicates)
deleted_count = social_accounts.count()
social_accounts.delete()
logger.info(
f"User {user.id} disconnected {deleted_count} {provider} account(s)")
return True, f"{provider.title()} account disconnected successfully."
except Exception as e:
logger.error(f"Error disconnecting {provider} for user {user.id}: {e}")
return False, f"Failed to disconnect {provider} account. Please try again."
@staticmethod
def get_auth_status(user: "User") -> Dict:
"""
Get comprehensive authentication status for a user.
Args:
user: The user to check
Returns:
Dictionary with authentication status information
"""
try:
connected_providers = SocialProviderService.get_connected_providers(user)
has_password_auth = (
user.email and
user.has_usable_password() and
bool(user.password)
)
auth_methods_count = len(connected_providers) + \
(1 if has_password_auth else 0)
return {
'user_id': user.id,
'username': user.username,
'email': user.email,
'has_password_auth': has_password_auth,
'connected_providers': connected_providers,
'total_auth_methods': auth_methods_count,
'can_disconnect_any': auth_methods_count > 1,
'requires_password_setup': not has_password_auth and len(connected_providers) == 1
}
except Exception as e:
logger.error(f"Error getting auth status for user {user.id}: {e}")
return {
'error': 'Unable to retrieve authentication status'
}
@staticmethod
def validate_provider_exists(provider: str) -> Tuple[bool, str]:
"""
Validate that a social provider is configured and available.
Args:
provider: The provider ID to validate
Returns:
Tuple of (is_valid: bool, message: str)
"""
try:
# Check if provider is registered with allauth
if provider not in registry.provider_map:
return False, f"Provider '{provider}' is not supported."
# Check if provider has a social app configured
if not SocialApp.objects.filter(provider=provider).exists():
return False, f"Provider '{provider}' is not configured on this site."
return True, f"Provider '{provider}' is valid and available."
except Exception as e:
logger.error(f"Error validating provider {provider}: {e}")
return False, "Unable to validate provider."