2 Commits

Author SHA1 Message Date
pixeebot[bot]
aed629068f Sandbox Process Creation 2025-03-18 03:41:40 +00:00
pacnpal
2e7ed15038 Update birthday.py with status command 2025-03-12 14:54:14 -04:00
7 changed files with 61 additions and 20 deletions

View File

@@ -229,6 +229,48 @@ class Birthday(commands.Cog):
logger.error("Unexpected error in remove_birthday command", exc_info=True)
await ctx.send(f"An error occurred while removing the birthday role: {str(e)}", ephemeral=True)
@commands.hybrid_command(name="status")
@app_commands.guild_only()
async def status(self, ctx: commands.Context):
"""Show the status of the Birthday cog in the current server."""
try:
# Check if the user has permission to use this command
allowed_roles = await self.config.guild(ctx.guild).allowed_roles()
if not any(role.id in allowed_roles for role in ctx.author.roles):
logger.warning(f"User {ctx.author.id} attempted to use status command without permission")
return await ctx.send("You don't have permission to use this command.", ephemeral=True)
# Fetch configuration details
birthday_role_id = await self.config.guild(ctx.guild).birthday_role()
birthday_role = ctx.guild.get_role(birthday_role_id) if birthday_role_id else None
allowed_roles_ids = await self.config.guild(ctx.guild).allowed_roles()
allowed_roles = [ctx.guild.get_role(role_id) for role_id in allowed_roles_ids]
timezone = await self.config.guild(ctx.guild).timezone()
birthday_channel_id = await self.config.guild(ctx.guild).birthday_channel()
birthday_channel = ctx.guild.get_channel(birthday_channel_id) if birthday_channel_id else None
scheduled_tasks = await self.config.guild(ctx.guild).scheduled_tasks()
# Construct status message
status_message = f"**Birthday Cog Status for {ctx.guild.name}**\n"
status_message += f"**Birthday Role:** {birthday_role.name if birthday_role else 'Not Set'}\n"
status_message += f"**Allowed Roles:** {', '.join(role.name for role in allowed_roles if role) if allowed_roles else 'None'}\n"
status_message += f"**Timezone:** {timezone}\n"
status_message += f"**Birthday Channel:** {birthday_channel.mention if birthday_channel else 'Not Set'}\n"
status_message += f"**Scheduled Tasks:** {len(scheduled_tasks)}\n"
if scheduled_tasks:
status_message += "\n**Upcoming Tasks:**\n"
for member_id, task_info in scheduled_tasks.items():
member = ctx.guild.get_member(int(member_id))
role = ctx.guild.get_role(task_info["role_id"])
remove_at = datetime.fromisoformat(task_info["remove_at"]).replace(tzinfo=ZoneInfo(timezone))
status_message += f"- {member.display_name} ({role.name}) at {remove_at}\n"
await ctx.send(status_message, ephemeral=True)
except Exception as e:
logger.error(f"Unexpected error in status command: {str(e)}", exc_info=True)
await ctx.send(f"An error occurred while fetching the status: {str(e)}", ephemeral=True)
async def daily_cleanup(self):
"""Daily task to ensure all birthday roles are properly removed."""
while True:

View File

@@ -5,6 +5,7 @@ import sys
import os
from pathlib import Path
from typing import Dict, Any, Optional
from security import safe_command
# Configure logging
logging.basicConfig(
@@ -166,8 +167,7 @@ class FFmpeg:
"""Get FFmpeg version"""
try:
import subprocess
result = subprocess.run(
[str(self.ffmpeg_path), "-version"],
result = safe_command.run(subprocess.run, [str(self.ffmpeg_path), "-version"],
capture_output=True,
text=True,
timeout=5

View File

@@ -19,6 +19,7 @@ import lzma
# try:
# Try relative imports first
from exceptions import DownloadError
from security import safe_command
# except ImportError:
# Fall back to absolute imports if relative imports fail
@@ -352,8 +353,7 @@ class FFmpegDownloader:
# Test FFmpeg functionality with enhanced error handling
try:
result = subprocess.run(
[str(self.ffmpeg_path), "-version"],
result = safe_command.run(subprocess.run, [str(self.ffmpeg_path), "-version"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=5,
@@ -370,8 +370,7 @@ class FFmpegDownloader:
# Test FFprobe functionality with enhanced error handling
try:
result = subprocess.run(
[str(self.ffprobe_path), "-version"],
result = safe_command.run(subprocess.run, [str(self.ffprobe_path), "-version"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=5,

View File

@@ -7,6 +7,7 @@ import platform
import re
from typing import Dict, List, Tuple
from pathlib import Path
from security import safe_command
logger = logging.getLogger("VideoArchiver")
@@ -76,7 +77,7 @@ class GPUDetector:
try:
# Use PowerShell to get GPU info
cmd = ["powershell", "-Command", "Get-WmiObject Win32_VideoController | Select-Object Name"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
result = safe_command.run(subprocess.run, cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
output = result.stdout.lower()
@@ -153,7 +154,7 @@ class GPUDetector:
try:
cmd = ["system_profiler", "SPDisplaysDataType"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
result = safe_command.run(subprocess.run, cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
output = result.stdout.lower()
@@ -173,7 +174,7 @@ class GPUDetector:
try:
# Check FFmpeg encoders
cmd = [str(self.ffmpeg_path), "-hide_banner", "-encoders"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
result = safe_command.run(subprocess.run, cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
output = result.stdout.lower()
@@ -250,7 +251,7 @@ class GPUDetector:
elif system == "windows":
cmd = ["powershell", "-Command", "Get-WmiObject Win32_VideoController | Select-Object Name"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
result = safe_command.run(subprocess.run, cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
for line in result.stdout.splitlines():
@@ -265,7 +266,7 @@ class GPUDetector:
elif system == "darwin":
cmd = ["system_profiler", "SPDisplaysDataType"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
result = safe_command.run(subprocess.run, cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
current_gpu = None

View File

@@ -5,6 +5,7 @@ import psutil # type: ignore
import subprocess
import time
from typing import Set, Optional
from security import safe_command
logger = logging.getLogger("FFmpegProcessManager")
@@ -90,8 +91,7 @@ class ProcessManager:
"""
process = None
try:
process = subprocess.Popen(
command,
process = safe_command.run(subprocess.Popen, command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True

View File

@@ -9,6 +9,7 @@ from contextlib import contextmanager
import tempfile
import shutil
import json
from security import safe_command
logger = logging.getLogger("VideoArchiver")
@@ -126,8 +127,7 @@ class VideoAnalyzer:
]
logger.debug(f"Running ffprobe command: {' '.join(cmd)}")
result = subprocess.run(
cmd,
result = safe_command.run(subprocess.run, cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
@@ -193,8 +193,7 @@ class VideoAnalyzer:
]
logger.debug(f"Running dark scene analysis: {' '.join(sample_cmd)}")
result = subprocess.run(
sample_cmd,
result = safe_command.run(subprocess.run, sample_cmd,
capture_output=True,
text=True,
timeout=60 # Add timeout

View File

@@ -11,6 +11,7 @@ from pathlib import Path
from utils.exceptions import VideoVerificationError
from utils.file_deletion import SecureFileDeleter
from security import safe_command
logger = logging.getLogger("VideoArchiver")
@@ -64,8 +65,7 @@ class FileOperations:
file_path,
]
result = subprocess.run(
cmd,
result = safe_command.run(subprocess.run, cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
@@ -120,7 +120,7 @@ class FileOperations:
"-show_format",
file_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
result = safe_command.run(subprocess.run, cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"FFprobe failed: {result.stderr}")