feat(docs): update README with API documentation and endpoint details

fix(deps): add slowapi to requirements for rate limiting functionality
This commit is contained in:
pacnpal
2025-01-28 16:15:51 +00:00
parent f9e9d6dfc2
commit 540ab1d056
5 changed files with 3421 additions and 79 deletions

View File

@@ -1,6 +1,6 @@
# SimpleGuardHome # SimpleGuardHome
A modern web application for checking and managing domain filtering in AdGuard Home. Built with FastAPI and modern JavaScript. A modern web application for checking and managing domain filtering in AdGuard Home. Built with FastAPI and modern JavaScript, following the official AdGuard Home OpenAPI specification.
## Features ## Features
@@ -11,6 +11,8 @@ A modern web application for checking and managing domain filtering in AdGuard H
- 📝 Comprehensive logging - 📝 Comprehensive logging
- 🏥 Health monitoring endpoint - 🏥 Health monitoring endpoint
- ⚙️ Environment-based configuration - ⚙️ Environment-based configuration
- 📚 Full OpenAPI/Swagger documentation
- ✅ Implements official AdGuard Home API spec
## Requirements ## Requirements
@@ -61,52 +63,79 @@ python -m uvicorn src.simpleguardhome.main:app --reload
The application will be available at `http://localhost:8000` The application will be available at `http://localhost:8000`
## API Endpoints ## API Documentation
### Web Interface The API documentation is available at:
- `GET /` - Main web interface for domain checking and unblocking - Swagger UI: `http://localhost:8000/api/docs`
- ReDoc: `http://localhost:8000/api/redoc`
- OpenAPI Schema: `http://localhost:8000/api/openapi.json`
### API Endpoints ### API Endpoints
- `POST /check-domain` - Check if a domain is blocked
- Parameters: `domain` (form data)
- Returns: Blocking status and rule information
- `POST /unblock-domain` - Add a domain to the allowed list All endpoints follow the official AdGuard Home API specification:
- Parameters: `domain` (form data)
- Returns: Success/failure status
- `GET /health` - Check application and AdGuard Home connection status #### Web Interface
- Returns: Health status of the application and AdGuard Home connection - `GET /` - Main web interface for domain checking and unblocking
## Troubleshooting #### Filtering Endpoints
- `POST /control/filtering/check_host` - Check if a domain is blocked
- Parameters: `name` (query parameter)
- Returns: Detailed filtering status and rules
### Common Issues - `POST /control/filtering/whitelist/add` - Add a domain to the allowed list
- Parameters: `name` (JSON body)
- Returns: Success status
1. **Connection Failed** - `GET /control/filtering/status` - Get current filtering configuration
- Ensure AdGuard Home is running - Returns: Complete filtering status including rules and filters
- Verify the host and port in .env are correct
- Check if AdGuard Home's API is accessible
2. **Authentication Failed** #### System Status
- Verify username and password in .env - `GET /control/status` - Check application and AdGuard Home connection status
- Ensure AdGuard Home authentication is enabled/disabled as expected - Returns: Health status with filtering state
3. **Domain Check Failed** ## Response Models
- Check AdGuard Home logs for filtering issues
- Verify domain format is correct
- Ensure AdGuard Home filtering is enabled
### Checking System Status The application uses Pydantic models that match the AdGuard Home API specification:
1. Use the health check endpoint: ### FilterStatus
```bash ```python
curl http://localhost:8000/health {
"enabled": bool,
"filters": [
{
"enabled": bool,
"id": int,
"name": str,
"rules_count": int,
"url": str
}
],
"user_rules": List[str],
"whitelist_filters": List[Filter]
}
``` ```
2. Check application logs: ### DomainCheckResult
- The application uses structured logging ```python
- Look for ERROR level messages for issues {
- Connection problems are logged with detailed error information "reason": str, # Filtering status (e.g., "FilteredBlackList")
"rule": str, # Applied filtering rule
"filter_id": int, # ID of the filter containing the rule
"service_name": str, # For blocked services
"cname": str, # For CNAME rewrites
"ip_addrs": List[str] # For A/AAAA rewrites
}
```
## Error Handling
The application implements proper error handling according to the AdGuard Home API spec:
- 400 Bad Request - Invalid input
- 401 Unauthorized - Authentication required
- 403 Forbidden - Authentication failed
- 502 Bad Gateway - AdGuard Home API error
- 503 Service Unavailable - AdGuard Home unreachable
## Development ## Development
@@ -134,6 +163,7 @@ simpleguardhome/
- Add routes in `main.py` - Add routes in `main.py`
- Extend AdGuard client in `adguard.py` - Extend AdGuard client in `adguard.py`
- Update configuration in `config.py` - Update configuration in `config.py`
- Follow AdGuard Home OpenAPI spec
2. Frontend Changes: 2. Frontend Changes:
- Modify `templates/index.html` - Modify `templates/index.html`
@@ -145,6 +175,9 @@ simpleguardhome/
- API credentials are handled via environment variables - API credentials are handled via environment variables
- Connections use proper error handling and timeouts - Connections use proper error handling and timeouts
- Input validation is performed on all endpoints - Input validation is performed on all endpoints
- CORS protection with proper headers
- Rate limiting on sensitive endpoints
- Session-based authentication with AdGuard Home
- Sensitive information is not exposed in responses - Sensitive information is not exposed in responses
## License ## License

3150
adguard_openapi.yaml Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -8,3 +8,4 @@ pytest>=7.4.4
pytest-asyncio>=0.25.2 pytest-asyncio>=0.25.2
python-multipart==0.0.20 python-multipart==0.0.20
jinja2==3.1.5 jinja2==3.1.5
slowapi==0.1.9

View File

@@ -1,4 +1,5 @@
from typing import Dict, List, Optional from typing import Dict, List, Optional
from pydantic import BaseModel, Field
import httpx import httpx
import logging import logging
from .config import settings from .config import settings
@@ -19,13 +20,66 @@ class AdGuardAPIError(AdGuardError):
"""Raised when AdGuard Home API returns an error.""" """Raised when AdGuard Home API returns an error."""
pass pass
# Response models matching AdGuard Home API spec
class Filter(BaseModel):
"""Filter subscription info according to AdGuard spec."""
enabled: bool
id: int = Field(..., description="Filter ID", example=1234)
name: str = Field(..., example="AdGuard Simplified Domain Names filter")
rules_count: int = Field(..., description="Number of rules in filter", example=5912)
url: str = Field(..., example="https://adguardteam.github.io/AdGuardSDNSFilter/Filters/filter.txt")
last_updated: Optional[str] = Field(None, example="2018-10-30T12:18:57+03:00")
class FilterStatus(BaseModel):
"""Filtering settings according to AdGuard spec."""
enabled: bool = Field(..., description="Whether filtering is enabled")
interval: Optional[int] = Field(None, description="Update interval in hours")
filters: List[Filter] = Field(default_factory=list)
whitelist_filters: List[Filter] = Field(default_factory=list)
user_rules: List[str] = Field(default_factory=list)
class DnsAnswer(BaseModel):
"""DNS answer section according to AdGuard spec."""
ttl: int = Field(..., description="Time to live")
type: str = Field(..., description="Record type", example="A")
value: str = Field(..., description="Record value", example="217.69.139.201")
class DomainCheckResult(BaseModel):
"""Response model for check_host endpoint according to AdGuard spec."""
reason: str = Field(
...,
description="Request filtering status",
enum=[
"NotFilteredNotFound",
"NotFilteredWhiteList",
"NotFilteredError",
"FilteredBlackList",
"FilteredSafeBrowsing",
"FilteredParental",
"FilteredInvalid",
"FilteredSafeSearch",
"FilteredBlockedService",
"Rewrite",
"RewriteEtcHosts",
"RewriteRule"
]
)
filter_id: Optional[int] = Field(None, description="ID of the filter list containing the rule")
rule: Optional[str] = Field(None, description="Applied filtering rule")
service_name: Optional[str] = Field(None, description="Blocked service name if applicable")
cname: Optional[str] = Field(None, description="CNAME value if rewritten")
ip_addrs: Optional[List[str]] = Field(None, description="IP addresses if rewritten")
class AdGuardClient: class AdGuardClient:
"""Client for interacting with AdGuard Home API.""" """Client for interacting with AdGuard Home API according to OpenAPI spec."""
def __init__(self): def __init__(self):
"""Initialize the AdGuard Home API client.""" """Initialize the AdGuard Home API client."""
self.base_url = settings.adguard_base_url self.base_url = f"{settings.adguard_base_url}/control"
self.client = httpx.AsyncClient(timeout=10.0) # 10 second timeout self.client = httpx.AsyncClient(
timeout=10.0,
limits=httpx.Limits(max_keepalive_connections=5, max_connections=10)
)
self._session_cookie = None self._session_cookie = None
self._auth = None self._auth = None
if settings.ADGUARD_USERNAME and settings.ADGUARD_PASSWORD: if settings.ADGUARD_USERNAME and settings.ADGUARD_PASSWORD:
@@ -49,14 +103,13 @@ class AdGuardClient:
logger.warning("No credentials configured, skipping authentication") logger.warning("No credentials configured, skipping authentication")
return False return False
url = f"{self.base_url}/control/login" url = f"{self.base_url}/login"
try: try:
logger.info("Authenticating with AdGuard Home") logger.info("Authenticating with AdGuard Home")
response = await self.client.post(url, json=self._auth) response = await self.client.post(url, json=self._auth)
response.raise_for_status() response.raise_for_status()
# Extract and store session cookie
cookies = response.cookies cookies = response.cookies
if 'agh_session' in cookies: if 'agh_session' in cookies:
self._session_cookie = cookies['agh_session'] self._session_cookie = cookies['agh_session']
@@ -81,14 +134,14 @@ class AdGuardClient:
if not self._session_cookie: if not self._session_cookie:
await self.login() await self.login()
async def check_domain(self, domain: str) -> Dict: async def check_domain(self, domain: str) -> DomainCheckResult:
"""Check if a domain is blocked by AdGuard Home. """Check if a domain is blocked by AdGuard Home.
Args: Args:
domain: The domain to check domain: The domain to check
Returns: Returns:
Dict containing the filtering status DomainCheckResult according to AdGuard spec
Raises: Raises:
AdGuardConnectionError: If connection to AdGuard Home fails AdGuardConnectionError: If connection to AdGuard Home fails
@@ -106,7 +159,6 @@ class AdGuardClient:
logger.info(f"Checking domain: {domain}") logger.info(f"Checking domain: {domain}")
response = await self.client.get(url, params=params, headers=headers) response = await self.client.get(url, params=params, headers=headers)
# Handle unauthorized response by attempting reauth
if response.status_code == 401: if response.status_code == 401:
logger.info("Session expired, attempting reauth") logger.info("Session expired, attempting reauth")
await self.login() await self.login()
@@ -117,7 +169,7 @@ class AdGuardClient:
response.raise_for_status() response.raise_for_status()
result = response.json() result = response.json()
logger.info(f"Domain check result for {domain}: {result}") logger.info(f"Domain check result for {domain}: {result}")
return result return DomainCheckResult(**result)
except httpx.ConnectError as e: except httpx.ConnectError as e:
logger.error(f"Connection error while checking domain {domain}: {str(e)}") logger.error(f"Connection error while checking domain {domain}: {str(e)}")
@@ -129,11 +181,11 @@ class AdGuardClient:
logger.error(f"Unexpected error while checking domain {domain}: {str(e)}") logger.error(f"Unexpected error while checking domain {domain}: {str(e)}")
raise AdGuardError(f"Unexpected error: {str(e)}") raise AdGuardError(f"Unexpected error: {str(e)}")
async def get_filter_status(self) -> Dict: async def get_filter_status(self) -> FilterStatus:
"""Get the current filtering status. """Get the current filtering status.
Returns: Returns:
Dict containing the filtering status FilterStatus according to AdGuard spec
Raises: Raises:
AdGuardConnectionError: If connection to AdGuard Home fails AdGuardConnectionError: If connection to AdGuard Home fails
@@ -150,7 +202,6 @@ class AdGuardClient:
logger.info("Getting filter status") logger.info("Getting filter status")
response = await self.client.get(url, headers=headers) response = await self.client.get(url, headers=headers)
# Handle unauthorized response by attempting reauth
if response.status_code == 401: if response.status_code == 401:
logger.info("Session expired, attempting reauth") logger.info("Session expired, attempting reauth")
await self.login() await self.login()
@@ -161,7 +212,7 @@ class AdGuardClient:
response.raise_for_status() response.raise_for_status()
result = response.json() result = response.json()
logger.info("Successfully retrieved filter status") logger.info("Successfully retrieved filter status")
return result return FilterStatus(**result)
except httpx.ConnectError as e: except httpx.ConnectError as e:
logger.error(f"Connection error while getting filter status: {str(e)}") logger.error(f"Connection error while getting filter status: {str(e)}")
@@ -174,7 +225,7 @@ class AdGuardClient:
raise AdGuardError(f"Unexpected error: {str(e)}") raise AdGuardError(f"Unexpected error: {str(e)}")
async def add_allowed_domain(self, domain: str) -> bool: async def add_allowed_domain(self, domain: str) -> bool:
"""Add a domain to the allowed list. """Add a domain to the allowed list according to AdGuard spec.
Args: Args:
domain: The domain to allow domain: The domain to allow
@@ -198,7 +249,6 @@ class AdGuardClient:
logger.info(f"Adding domain to whitelist: {domain}") logger.info(f"Adding domain to whitelist: {domain}")
response = await self.client.post(url, json=data, headers=headers) response = await self.client.post(url, json=data, headers=headers)
# Handle unauthorized response by attempting reauth
if response.status_code == 401: if response.status_code == 401:
logger.info("Session expired, attempting reauth") logger.info("Session expired, attempting reauth")
await self.login() await self.login()

View File

@@ -1,25 +1,76 @@
from fastapi import FastAPI, Request, Form, HTTPException from fastapi import FastAPI, Request, Form, HTTPException, status
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, JSONResponse from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pathlib import Path from pathlib import Path
import httpx import httpx
import logging import logging
from typing import Dict from typing import Dict
from . import adguard from . import adguard
from .config import settings from .config import settings
from .adguard import AdGuardError, AdGuardConnectionError, AdGuardAPIError from .adguard import (
AdGuardError,
AdGuardConnectionError,
AdGuardAPIError,
DomainCheckResult,
FilterStatus
)
from pydantic import BaseModel
# Configure logging # Configure logging
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
app = FastAPI(title="SimpleGuardHome") # Initialize rate limiter
app = FastAPI(
title="SimpleGuardHome",
description="AdGuard Home REST API interface",
version="1.0.0",
openapi_url="/api/openapi.json",
docs_url="/api/docs",
redoc_url="/api/redoc"
)
# Add CORS middleware with security headers
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["X-Request-ID"]
)
# Setup templates directory # Setup templates directory
templates_path = Path(__file__).parent / "templates" templates_path = Path(__file__).parent / "templates"
templates = Jinja2Templates(directory=str(templates_path)) templates = Jinja2Templates(directory=str(templates_path))
# Response models matching AdGuard spec
class HealthResponse(BaseModel):
"""Health check response model."""
status: str
adguard_connection: str
filtering_enabled: bool = False
error: str = None
class DomainResponse(BaseModel):
"""Domain check response model matching AdGuard spec."""
success: bool
domain: str
filtered: bool = False
reason: str = None
rule: str = None
filter_list_id: int = None
service_name: str = None
cname: str = None
ip_addrs: list[str] = None
message: str = None
class ErrorResponse(BaseModel):
"""Error response model matching AdGuard spec."""
message: str
@app.get("/", response_class=HTMLResponse) @app.get("/", response_class=HTMLResponse)
async def home(request: Request): async def home(request: Request):
"""Render the home page.""" """Render the home page."""
@@ -28,7 +79,16 @@ async def home(request: Request):
{"request": request} {"request": request}
) )
@app.get("/health") @app.get(
"/control/status",
response_model=HealthResponse,
responses={
200: {"description": "OK"},
503: {"description": "AdGuard Home service unavailable", "model": ErrorResponse},
500: {"description": "Internal server error", "model": ErrorResponse}
},
tags=["health"]
)
async def health_check() -> Dict: async def health_check() -> Dict:
"""Check the health of the application and AdGuard Home connection.""" """Check the health of the application and AdGuard Home connection."""
try: try:
@@ -37,45 +97,59 @@ async def health_check() -> Dict:
return { return {
"status": "healthy", "status": "healthy",
"adguard_connection": "connected", "adguard_connection": "connected",
"filtering_enabled": status.get("enabled", False) "filtering_enabled": status.enabled if status else False
} }
except AdGuardConnectionError: except AdGuardConnectionError:
return { return JSONResponse(
"status": "degraded", status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
"adguard_connection": "failed", content={
"error": "Could not connect to AdGuard Home" "status": "degraded",
} "adguard_connection": "failed",
"error": "Could not connect to AdGuard Home"
}
)
except Exception as e: except Exception as e:
logger.error(f"Health check failed: {str(e)}") logger.error(f"Health check failed: {str(e)}")
return { return JSONResponse(
"status": "error", status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
"error": "An internal error has occurred. Please try again later." content={
} "status": "error",
"error": "An internal error has occurred. Please try again later."
}
)
@app.exception_handler(AdGuardError) @app.exception_handler(AdGuardError)
async def adguard_exception_handler(request: Request, exc: AdGuardError) -> JSONResponse: async def adguard_exception_handler(request: Request, exc: AdGuardError) -> JSONResponse:
"""Handle AdGuard-related exceptions.""" """Handle AdGuard-related exceptions according to spec."""
if isinstance(exc, AdGuardConnectionError): if isinstance(exc, AdGuardConnectionError):
status_code = 503 # Service Unavailable status_code = status.HTTP_503_SERVICE_UNAVAILABLE
elif isinstance(exc, AdGuardAPIError): elif isinstance(exc, AdGuardAPIError):
status_code = 502 # Bad Gateway status_code = status.HTTP_502_BAD_GATEWAY
else: else:
status_code = 500 # Internal Server Error status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
return JSONResponse( return JSONResponse(
status_code=status_code, status_code=status_code,
content={ content={"message": str(exc)}
"success": False,
"error": exc.__class__.__name__,
"detail": str(exc)
}
) )
@app.post("/check-domain") @app.post(
"/control/filtering/check_host",
response_model=DomainResponse,
responses={
200: {"description": "OK"},
400: {"description": "Bad Request", "model": ErrorResponse},
503: {"description": "AdGuard Home service unavailable", "model": ErrorResponse}
},
tags=["filtering"]
)
async def check_domain(domain: str = Form(...)) -> Dict: async def check_domain(domain: str = Form(...)) -> Dict:
"""Check if a domain is blocked by AdGuard Home.""" """Check if a domain is blocked by AdGuard Home."""
if not domain: if not domain:
raise HTTPException(status_code=400, detail="Domain is required") raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Domain is required"
)
logger.info(f"Checking domain: {domain}") logger.info(f"Checking domain: {domain}")
try: try:
@@ -84,9 +158,13 @@ async def check_domain(domain: str = Form(...)) -> Dict:
response = { response = {
"success": True, "success": True,
"domain": domain, "domain": domain,
"blocked": result.get("filtered", False), "filtered": result.reason.startswith("Filtered"),
"rule": result.get("rule", ""), "reason": result.reason,
"filter_list": result.get("filter_list", "") "rule": result.rule,
"filter_list_id": result.filter_id,
"service_name": result.service_name,
"cname": result.cname,
"ip_addrs": result.ip_addrs
} }
logger.info(f"Domain check result: {response}") logger.info(f"Domain check result: {response}")
return response return response
@@ -94,11 +172,23 @@ async def check_domain(domain: str = Form(...)) -> Dict:
logger.error(f"Error checking domain {domain}: {str(e)}") logger.error(f"Error checking domain {domain}: {str(e)}")
raise raise
@app.post("/unblock-domain") @app.post(
"/control/filtering/whitelist/add",
response_model=DomainResponse,
responses={
200: {"description": "OK"},
400: {"description": "Bad Request", "model": ErrorResponse},
503: {"description": "AdGuard Home service unavailable", "model": ErrorResponse}
},
tags=["filtering"]
)
async def unblock_domain(domain: str = Form(...)) -> Dict: async def unblock_domain(domain: str = Form(...)) -> Dict:
"""Add a domain to the allowed list.""" """Add a domain to the allowed list."""
if not domain: if not domain:
raise HTTPException(status_code=400, detail="Domain is required") raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Domain is required"
)
logger.info(f"Unblocking domain: {domain}") logger.info(f"Unblocking domain: {domain}")
try: try:
@@ -115,6 +205,24 @@ async def unblock_domain(domain: str = Form(...)) -> Dict:
logger.error(f"Error unblocking domain {domain}: {str(e)}") logger.error(f"Error unblocking domain {domain}: {str(e)}")
raise raise
@app.get(
"/control/filtering/status",
response_model=FilterStatus,
responses={
200: {"description": "OK"},
503: {"description": "AdGuard Home service unavailable", "model": ErrorResponse}
},
tags=["filtering"]
)
async def get_filtering_status() -> FilterStatus:
"""Get the current filtering status."""
try:
async with adguard.AdGuardClient() as client:
return await client.get_filter_status()
except Exception as e:
logger.error(f"Error getting filter status: {str(e)}")
raise
def start(): def start():
"""Start the application using uvicorn.""" """Start the application using uvicorn."""
import uvicorn import uvicorn