Implement reaction history context handling; enhance HTTP server initialization and logging; add static file serving
This commit is contained in:
@@ -25,8 +25,7 @@ from .queue_manager import QueueManager
|
||||
from .api import APIManager
|
||||
from .handlers import MessageHandler, ImageHandler, ToolHandler, EventHandler
|
||||
from .training import TrainingManager
|
||||
from .http_server import HTTPServer
|
||||
from aiohttp import web
|
||||
from .web.app import init_app, run_webserver
|
||||
|
||||
|
||||
class DiscordBot:
|
||||
@@ -49,93 +48,71 @@ class DiscordBot:
|
||||
self.event_handler = None
|
||||
self.training_manager = TrainingManager() # Initialize training manager
|
||||
self.http_server = None
|
||||
self.internal_app = web.Application()
|
||||
self.internal_runner = None
|
||||
|
||||
async def _initialize_services(self) -> None:
|
||||
"""Initialize API and queue services."""
|
||||
try:
|
||||
async with self._init_lock:
|
||||
if not self._initialized:
|
||||
# Initialize database first
|
||||
await self.db_manager.init_db()
|
||||
logger.info("Database initialized")
|
||||
try:
|
||||
# Initialize database first
|
||||
await self.db_manager.init_db()
|
||||
logger.info("Database initialized")
|
||||
|
||||
# Initialize all handlers
|
||||
self.message_handler = MessageHandler(self.db_manager)
|
||||
logger.info("Message handler initialized")
|
||||
|
||||
self.image_handler = ImageHandler(self.api_manager)
|
||||
logger.info("Image handler initialized")
|
||||
|
||||
self.tool_handler = ToolHandler(self.bot)
|
||||
logger.info("Tool handler initialized")
|
||||
|
||||
self.event_handler = EventHandler(
|
||||
self.bot,
|
||||
self.queue_manager,
|
||||
self.db_manager,
|
||||
self.api_manager,
|
||||
message_handler=self.message_handler,
|
||||
image_handler=self.image_handler,
|
||||
tool_handler=self.tool_handler
|
||||
)
|
||||
logger.info("Event handler initialized with all handlers")
|
||||
# Initialize all handlers first
|
||||
self.message_handler = MessageHandler(self.db_manager)
|
||||
logger.info("Message handler initialized")
|
||||
|
||||
self.image_handler = ImageHandler(self.api_manager)
|
||||
logger.info("Image handler initialized")
|
||||
|
||||
self.tool_handler = ToolHandler(self.bot)
|
||||
logger.info("Tool handler initialized")
|
||||
|
||||
self.event_handler = EventHandler(
|
||||
self.bot,
|
||||
self.queue_manager,
|
||||
self.db_manager,
|
||||
self.api_manager,
|
||||
message_handler=self.message_handler,
|
||||
image_handler=self.image_handler,
|
||||
tool_handler=self.tool_handler
|
||||
)
|
||||
logger.info("Event handler initialized with all handlers")
|
||||
|
||||
# Start API manager
|
||||
if not self.api_manager.is_running:
|
||||
await self.api_manager.start()
|
||||
# Initialize and start web app
|
||||
logger.info("Initializing web interface...")
|
||||
app = init_app(self.event_handler)
|
||||
web_port = 5000
|
||||
asyncio.create_task(run_webserver(start_port=web_port))
|
||||
logger.info("Web interface initialized (first available port in range 5000-5009 will be used)")
|
||||
|
||||
# Start API manager
|
||||
if not self.api_manager.is_running:
|
||||
await self.api_manager.start()
|
||||
logger.info("Started API health check loop")
|
||||
|
||||
# Wait for API manager to be ready
|
||||
await asyncio.sleep(1)
|
||||
# Wait for API manager to be ready
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# Start queue manager with event handler's process message
|
||||
if not self.queue_manager.is_running:
|
||||
await self.queue_manager.start()
|
||||
logger.info("Queue processor started")
|
||||
# Start queue manager with event handler's process message
|
||||
if not self.queue_manager.is_running:
|
||||
await self.queue_manager.start()
|
||||
logger.info("Queue processor started")
|
||||
|
||||
self._initialized = True
|
||||
# Mark initialization complete after all services are started
|
||||
self._initialized = True
|
||||
logger.info("All services initialized successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Error during services initialization: {e}")
|
||||
# Clean up any partially initialized services
|
||||
await self.stop()
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize services: {e}")
|
||||
self._initialized = False
|
||||
raise
|
||||
|
||||
async def _handle_prompt(self, request: web.Request) -> web.Response:
|
||||
"""Handle incoming prompt requests from the web interface."""
|
||||
try:
|
||||
# Validate API key if provided in environment
|
||||
expected_key = os.getenv('BACKEND_API_KEY')
|
||||
if expected_key:
|
||||
provided_key = request.headers.get('X-API-Key')
|
||||
if not provided_key or provided_key != expected_key:
|
||||
return web.json_response({"error": "Invalid API key"}, status=401)
|
||||
|
||||
# Parse request body
|
||||
try:
|
||||
body = await request.json()
|
||||
except ValueError:
|
||||
return web.json_response({"error": "Invalid JSON"}, status=400)
|
||||
|
||||
# Validate required fields
|
||||
prompt = body.get('prompt')
|
||||
if not prompt:
|
||||
return web.json_response({"error": "Missing required field: prompt"}, status=400)
|
||||
|
||||
# Use provided channel_id or default
|
||||
channel_id = body.get('channel_id', AUTO_RESPONSE_CHANNEL_ID)
|
||||
|
||||
# Have the event handler process the prompt
|
||||
if self.event_handler:
|
||||
await self.event_handler.send_prompt_to_channel(prompt, channel_id)
|
||||
return web.json_response({"status": "processing"})
|
||||
else:
|
||||
return web.json_response({"error": "Event handler not initialized"}, status=503)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling prompt request: {e}")
|
||||
return web.json_response({"error": str(e)}, status=500)
|
||||
|
||||
async def _handle_connection(self, token: str) -> None:
|
||||
"""Handle bot connection with retries."""
|
||||
retry_count = 0
|
||||
@@ -278,10 +255,8 @@ class DiscordBot:
|
||||
stop_tasks = []
|
||||
if self.training_manager and self.training_manager.is_running:
|
||||
stop_tasks.append(self.training_manager.stop())
|
||||
if self.internal_runner and hasattr(self.internal_runner, 'cleanup'):
|
||||
stop_tasks.append(self.internal_runner.cleanup())
|
||||
if self.http_server:
|
||||
stop_tasks.append(self.http_server.stop())
|
||||
# Web app will be stopped when the event loop closes
|
||||
logger.info("Web app will be stopped with event loop")
|
||||
if self.db_pool:
|
||||
stop_tasks.append(self.db_pool.close())
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
import re
|
||||
import uuid
|
||||
import asyncio
|
||||
import json
|
||||
from typing import Dict, Any
|
||||
from datetime import datetime
|
||||
from discord import Message, RawReactionActionEvent
|
||||
@@ -335,12 +336,16 @@ class EventHandler:
|
||||
try:
|
||||
# Start typing indicator
|
||||
async with item.channel.typing():
|
||||
# Get fresh conversation history first
|
||||
history = await self.db_manager.get_conversation_history(
|
||||
user_id=0,
|
||||
channel_id=item.channel.id,
|
||||
)
|
||||
logger.debug(f"Retrieved {len(history)} messages for context")
|
||||
# Use history from context if available (like for reactions), otherwise get fresh history
|
||||
history = item.context.get("history")
|
||||
if not history:
|
||||
history = await self.db_manager.get_conversation_history(
|
||||
user_id=0,
|
||||
channel_id=item.channel.id,
|
||||
)
|
||||
logger.debug(f"Retrieved {len(history)} messages for context")
|
||||
else:
|
||||
logger.debug(f"Using {len(history)} messages from existing context")
|
||||
|
||||
# Generate message UUID upfront
|
||||
message_uuid = str(uuid.uuid4())
|
||||
|
||||
@@ -12,12 +12,46 @@ from .config import logger, AUTO_RESPONSE_CHANNEL_ID
|
||||
class HTTPServer:
|
||||
"""HTTP server that accepts prompts from backend."""
|
||||
|
||||
def __init__(self, event_handler):
|
||||
"""Initialize with event handler reference."""
|
||||
self.event_handler = event_handler
|
||||
self.app = web.Application()
|
||||
self.app.router.add_post('/api/prompt', self.handle_prompt)
|
||||
self.runner: Optional[web.AppRunner] = None
|
||||
def __init__(self, event_handler=None):
|
||||
"""Initialize with optional event handler reference."""
|
||||
self._event_handler = None
|
||||
try:
|
||||
self.app = web.Application()
|
||||
logger.info("Created HTTP Application")
|
||||
# API endpoints
|
||||
self.app.router.add_post('/api/prompt', self.handle_prompt)
|
||||
logger.info("Added /api/prompt endpoint")
|
||||
|
||||
# Web interface configuration
|
||||
self.app.router.add_get('/', self.serve_web_interface)
|
||||
self.app.router.add_static('/static', 'discord_glhf/web/static')
|
||||
logger.info("Configured web interface routes")
|
||||
self.runner: Optional[web.AppRunner] = None
|
||||
logger.info("HTTP server initialized and ready to start")
|
||||
# Set event handler after initialization
|
||||
self.event_handler = event_handler
|
||||
except Exception as e:
|
||||
logger.error(f"Error initializing HTTP server: {e}")
|
||||
raise
|
||||
|
||||
@property
|
||||
def event_handler(self):
|
||||
"""Get the event handler."""
|
||||
return self._event_handler
|
||||
|
||||
@event_handler.setter
|
||||
def event_handler(self, handler):
|
||||
"""Set the event handler."""
|
||||
self._event_handler = handler
|
||||
if handler:
|
||||
logger.info("Event handler updated")
|
||||
|
||||
async def serve_web_interface(self, request: web.Request) -> web.Response:
|
||||
"""Serve the web interface HTML."""
|
||||
try:
|
||||
return web.FileResponse('discord_glhf/web/templates/index.html')
|
||||
except FileNotFoundError:
|
||||
return web.Response(text="Web interface not found", status=404)
|
||||
|
||||
async def handle_prompt(self, request: web.Request) -> web.Response:
|
||||
"""Handle incoming prompt requests."""
|
||||
@@ -73,11 +107,30 @@ class HTTPServer:
|
||||
|
||||
async def start(self, host: str = '127.0.0.1', port: int = 8000):
|
||||
"""Start the HTTP server."""
|
||||
self.runner = web.AppRunner(self.app)
|
||||
await self.runner.setup()
|
||||
site = web.TCPSite(self.runner, host, port)
|
||||
await site.start()
|
||||
logger.info(f"HTTP server started on http://{host}:{port}")
|
||||
try:
|
||||
logger.info(f"Starting HTTP server on {host}:{port}...")
|
||||
|
||||
# Create and setup the runner
|
||||
self.runner = web.AppRunner(self.app)
|
||||
logger.info("Created AppRunner")
|
||||
await self.runner.setup()
|
||||
logger.info("AppRunner setup complete")
|
||||
|
||||
# Create and start the site
|
||||
site = web.TCPSite(self.runner, host, port)
|
||||
logger.info("Created TCPSite")
|
||||
await site.start()
|
||||
logger.info(f"HTTP server is now listening on http://{host}:{port}")
|
||||
|
||||
# Test if the server is actually running by trying to bind
|
||||
logger.info("HTTP server startup complete and ready to handle requests")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to start HTTP server: {e}")
|
||||
if self.runner:
|
||||
await self.runner.cleanup()
|
||||
raise
|
||||
|
||||
async def stop(self):
|
||||
"""Stop the HTTP server."""
|
||||
|
||||
@@ -3,7 +3,10 @@
|
||||
|
||||
from quart import Quart, render_template, request, jsonify
|
||||
import os
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from functools import wraps
|
||||
from ..config import logger
|
||||
|
||||
def async_route(f):
|
||||
@wraps(f)
|
||||
@@ -64,26 +67,40 @@ async def send_prompt():
|
||||
except Exception as e:
|
||||
return jsonify({'error': str(e)}), 500
|
||||
|
||||
def run_webserver(port=5000):
|
||||
async def run_webserver(start_port=5000):
|
||||
"""Run the web server."""
|
||||
import hypercorn.asyncio
|
||||
from hypercorn.config import Config
|
||||
import socket
|
||||
|
||||
config = Config()
|
||||
config.bind = [f"0.0.0.0:{port}"]
|
||||
config.use_reloader = True
|
||||
# Try ports in range start_port to start_port + 10
|
||||
for port in range(start_port, start_port + 10):
|
||||
config = Config()
|
||||
config.bind = [f"0.0.0.0:{port}"]
|
||||
config.use_reloader = True
|
||||
|
||||
import asyncio
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
loop.run_until_complete(hypercorn.asyncio.serve(app, config))
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
loop.close()
|
||||
try:
|
||||
# Test if port is available
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.bind(('0.0.0.0', port))
|
||||
sock.close()
|
||||
|
||||
logger.info(f"Starting web interface at http://localhost:{port}")
|
||||
try:
|
||||
await hypercorn.asyncio.serve(app, config)
|
||||
break
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Web server shutdown requested")
|
||||
break
|
||||
except OSError:
|
||||
if port == start_port + 9: # Last attempt
|
||||
logger.error(f"Could not find an available port in range {start_port}-{port}")
|
||||
raise
|
||||
logger.warning(f"Port {port} is in use, trying next port...")
|
||||
continue
|
||||
|
||||
if __name__ == "__main__":
|
||||
from discord_glhf.config import AUTO_RESPONSE_CHANNEL_ID
|
||||
import asyncio
|
||||
from ..config import AUTO_RESPONSE_CHANNEL_ID
|
||||
port = int(os.getenv('WEB_PORT', '8080'))
|
||||
run_webserver(port)
|
||||
asyncio.run(run_webserver(port))
|
||||
0
discord_glhf/web/static/css/main.css
Normal file
0
discord_glhf/web/static/css/main.css
Normal file
0
discord_glhf/web/static/js/app.js
Normal file
0
discord_glhf/web/static/js/app.js
Normal file
Reference in New Issue
Block a user