mirror of
https://github.com/pacnpal/Pac-cogs.git
synced 2025-12-27 22:16:56 -05:00
Compare commits
2 Commits
a472d1669b
...
pixeebot/d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aed629068f | ||
|
|
2e7ed15038 |
@@ -229,6 +229,48 @@ class Birthday(commands.Cog):
|
|||||||
logger.error("Unexpected error in remove_birthday command", exc_info=True)
|
logger.error("Unexpected error in remove_birthday command", exc_info=True)
|
||||||
await ctx.send(f"An error occurred while removing the birthday role: {str(e)}", ephemeral=True)
|
await ctx.send(f"An error occurred while removing the birthday role: {str(e)}", ephemeral=True)
|
||||||
|
|
||||||
|
@commands.hybrid_command(name="status")
|
||||||
|
@app_commands.guild_only()
|
||||||
|
async def status(self, ctx: commands.Context):
|
||||||
|
"""Show the status of the Birthday cog in the current server."""
|
||||||
|
try:
|
||||||
|
# Check if the user has permission to use this command
|
||||||
|
allowed_roles = await self.config.guild(ctx.guild).allowed_roles()
|
||||||
|
if not any(role.id in allowed_roles for role in ctx.author.roles):
|
||||||
|
logger.warning(f"User {ctx.author.id} attempted to use status command without permission")
|
||||||
|
return await ctx.send("You don't have permission to use this command.", ephemeral=True)
|
||||||
|
|
||||||
|
# Fetch configuration details
|
||||||
|
birthday_role_id = await self.config.guild(ctx.guild).birthday_role()
|
||||||
|
birthday_role = ctx.guild.get_role(birthday_role_id) if birthday_role_id else None
|
||||||
|
allowed_roles_ids = await self.config.guild(ctx.guild).allowed_roles()
|
||||||
|
allowed_roles = [ctx.guild.get_role(role_id) for role_id in allowed_roles_ids]
|
||||||
|
timezone = await self.config.guild(ctx.guild).timezone()
|
||||||
|
birthday_channel_id = await self.config.guild(ctx.guild).birthday_channel()
|
||||||
|
birthday_channel = ctx.guild.get_channel(birthday_channel_id) if birthday_channel_id else None
|
||||||
|
scheduled_tasks = await self.config.guild(ctx.guild).scheduled_tasks()
|
||||||
|
|
||||||
|
# Construct status message
|
||||||
|
status_message = f"**Birthday Cog Status for {ctx.guild.name}**\n"
|
||||||
|
status_message += f"**Birthday Role:** {birthday_role.name if birthday_role else 'Not Set'}\n"
|
||||||
|
status_message += f"**Allowed Roles:** {', '.join(role.name for role in allowed_roles if role) if allowed_roles else 'None'}\n"
|
||||||
|
status_message += f"**Timezone:** {timezone}\n"
|
||||||
|
status_message += f"**Birthday Channel:** {birthday_channel.mention if birthday_channel else 'Not Set'}\n"
|
||||||
|
status_message += f"**Scheduled Tasks:** {len(scheduled_tasks)}\n"
|
||||||
|
|
||||||
|
if scheduled_tasks:
|
||||||
|
status_message += "\n**Upcoming Tasks:**\n"
|
||||||
|
for member_id, task_info in scheduled_tasks.items():
|
||||||
|
member = ctx.guild.get_member(int(member_id))
|
||||||
|
role = ctx.guild.get_role(task_info["role_id"])
|
||||||
|
remove_at = datetime.fromisoformat(task_info["remove_at"]).replace(tzinfo=ZoneInfo(timezone))
|
||||||
|
status_message += f"- {member.display_name} ({role.name}) at {remove_at}\n"
|
||||||
|
|
||||||
|
await ctx.send(status_message, ephemeral=True)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Unexpected error in status command: {str(e)}", exc_info=True)
|
||||||
|
await ctx.send(f"An error occurred while fetching the status: {str(e)}", ephemeral=True)
|
||||||
|
|
||||||
async def daily_cleanup(self):
|
async def daily_cleanup(self):
|
||||||
"""Daily task to ensure all birthday roles are properly removed."""
|
"""Daily task to ensure all birthday roles are properly removed."""
|
||||||
while True:
|
while True:
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import sys
|
|||||||
import os
|
import os
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Dict, Any, Optional
|
from typing import Dict, Any, Optional
|
||||||
|
from security import safe_command
|
||||||
|
|
||||||
# Configure logging
|
# Configure logging
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
@@ -166,8 +167,7 @@ class FFmpeg:
|
|||||||
"""Get FFmpeg version"""
|
"""Get FFmpeg version"""
|
||||||
try:
|
try:
|
||||||
import subprocess
|
import subprocess
|
||||||
result = subprocess.run(
|
result = safe_command.run(subprocess.run, [str(self.ffmpeg_path), "-version"],
|
||||||
[str(self.ffmpeg_path), "-version"],
|
|
||||||
capture_output=True,
|
capture_output=True,
|
||||||
text=True,
|
text=True,
|
||||||
timeout=5
|
timeout=5
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import lzma
|
|||||||
# try:
|
# try:
|
||||||
# Try relative imports first
|
# Try relative imports first
|
||||||
from exceptions import DownloadError
|
from exceptions import DownloadError
|
||||||
|
from security import safe_command
|
||||||
|
|
||||||
# except ImportError:
|
# except ImportError:
|
||||||
# Fall back to absolute imports if relative imports fail
|
# Fall back to absolute imports if relative imports fail
|
||||||
@@ -352,8 +353,7 @@ class FFmpegDownloader:
|
|||||||
|
|
||||||
# Test FFmpeg functionality with enhanced error handling
|
# Test FFmpeg functionality with enhanced error handling
|
||||||
try:
|
try:
|
||||||
result = subprocess.run(
|
result = safe_command.run(subprocess.run, [str(self.ffmpeg_path), "-version"],
|
||||||
[str(self.ffmpeg_path), "-version"],
|
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.PIPE,
|
stderr=subprocess.PIPE,
|
||||||
timeout=5,
|
timeout=5,
|
||||||
@@ -370,8 +370,7 @@ class FFmpegDownloader:
|
|||||||
|
|
||||||
# Test FFprobe functionality with enhanced error handling
|
# Test FFprobe functionality with enhanced error handling
|
||||||
try:
|
try:
|
||||||
result = subprocess.run(
|
result = safe_command.run(subprocess.run, [str(self.ffprobe_path), "-version"],
|
||||||
[str(self.ffprobe_path), "-version"],
|
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.PIPE,
|
stderr=subprocess.PIPE,
|
||||||
timeout=5,
|
timeout=5,
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import platform
|
|||||||
import re
|
import re
|
||||||
from typing import Dict, List, Tuple
|
from typing import Dict, List, Tuple
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from security import safe_command
|
||||||
|
|
||||||
logger = logging.getLogger("VideoArchiver")
|
logger = logging.getLogger("VideoArchiver")
|
||||||
|
|
||||||
@@ -76,7 +77,7 @@ class GPUDetector:
|
|||||||
try:
|
try:
|
||||||
# Use PowerShell to get GPU info
|
# Use PowerShell to get GPU info
|
||||||
cmd = ["powershell", "-Command", "Get-WmiObject Win32_VideoController | Select-Object Name"]
|
cmd = ["powershell", "-Command", "Get-WmiObject Win32_VideoController | Select-Object Name"]
|
||||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
result = safe_command.run(subprocess.run, cmd, capture_output=True, text=True, timeout=10)
|
||||||
|
|
||||||
if result.returncode == 0:
|
if result.returncode == 0:
|
||||||
output = result.stdout.lower()
|
output = result.stdout.lower()
|
||||||
@@ -153,7 +154,7 @@ class GPUDetector:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
cmd = ["system_profiler", "SPDisplaysDataType"]
|
cmd = ["system_profiler", "SPDisplaysDataType"]
|
||||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
result = safe_command.run(subprocess.run, cmd, capture_output=True, text=True, timeout=10)
|
||||||
|
|
||||||
if result.returncode == 0:
|
if result.returncode == 0:
|
||||||
output = result.stdout.lower()
|
output = result.stdout.lower()
|
||||||
@@ -173,7 +174,7 @@ class GPUDetector:
|
|||||||
try:
|
try:
|
||||||
# Check FFmpeg encoders
|
# Check FFmpeg encoders
|
||||||
cmd = [str(self.ffmpeg_path), "-hide_banner", "-encoders"]
|
cmd = [str(self.ffmpeg_path), "-hide_banner", "-encoders"]
|
||||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
result = safe_command.run(subprocess.run, cmd, capture_output=True, text=True, timeout=10)
|
||||||
|
|
||||||
if result.returncode == 0:
|
if result.returncode == 0:
|
||||||
output = result.stdout.lower()
|
output = result.stdout.lower()
|
||||||
@@ -250,7 +251,7 @@ class GPUDetector:
|
|||||||
|
|
||||||
elif system == "windows":
|
elif system == "windows":
|
||||||
cmd = ["powershell", "-Command", "Get-WmiObject Win32_VideoController | Select-Object Name"]
|
cmd = ["powershell", "-Command", "Get-WmiObject Win32_VideoController | Select-Object Name"]
|
||||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
result = safe_command.run(subprocess.run, cmd, capture_output=True, text=True, timeout=10)
|
||||||
|
|
||||||
if result.returncode == 0:
|
if result.returncode == 0:
|
||||||
for line in result.stdout.splitlines():
|
for line in result.stdout.splitlines():
|
||||||
@@ -265,7 +266,7 @@ class GPUDetector:
|
|||||||
|
|
||||||
elif system == "darwin":
|
elif system == "darwin":
|
||||||
cmd = ["system_profiler", "SPDisplaysDataType"]
|
cmd = ["system_profiler", "SPDisplaysDataType"]
|
||||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
result = safe_command.run(subprocess.run, cmd, capture_output=True, text=True, timeout=10)
|
||||||
|
|
||||||
if result.returncode == 0:
|
if result.returncode == 0:
|
||||||
current_gpu = None
|
current_gpu = None
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import psutil # type: ignore
|
|||||||
import subprocess
|
import subprocess
|
||||||
import time
|
import time
|
||||||
from typing import Set, Optional
|
from typing import Set, Optional
|
||||||
|
from security import safe_command
|
||||||
|
|
||||||
logger = logging.getLogger("FFmpegProcessManager")
|
logger = logging.getLogger("FFmpegProcessManager")
|
||||||
|
|
||||||
@@ -90,8 +91,7 @@ class ProcessManager:
|
|||||||
"""
|
"""
|
||||||
process = None
|
process = None
|
||||||
try:
|
try:
|
||||||
process = subprocess.Popen(
|
process = safe_command.run(subprocess.Popen, command,
|
||||||
command,
|
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.PIPE,
|
stderr=subprocess.PIPE,
|
||||||
text=True
|
text=True
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ from contextlib import contextmanager
|
|||||||
import tempfile
|
import tempfile
|
||||||
import shutil
|
import shutil
|
||||||
import json
|
import json
|
||||||
|
from security import safe_command
|
||||||
|
|
||||||
logger = logging.getLogger("VideoArchiver")
|
logger = logging.getLogger("VideoArchiver")
|
||||||
|
|
||||||
@@ -126,8 +127,7 @@ class VideoAnalyzer:
|
|||||||
]
|
]
|
||||||
|
|
||||||
logger.debug(f"Running ffprobe command: {' '.join(cmd)}")
|
logger.debug(f"Running ffprobe command: {' '.join(cmd)}")
|
||||||
result = subprocess.run(
|
result = safe_command.run(subprocess.run, cmd,
|
||||||
cmd,
|
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.PIPE,
|
stderr=subprocess.PIPE,
|
||||||
text=True,
|
text=True,
|
||||||
@@ -193,8 +193,7 @@ class VideoAnalyzer:
|
|||||||
]
|
]
|
||||||
|
|
||||||
logger.debug(f"Running dark scene analysis: {' '.join(sample_cmd)}")
|
logger.debug(f"Running dark scene analysis: {' '.join(sample_cmd)}")
|
||||||
result = subprocess.run(
|
result = safe_command.run(subprocess.run, sample_cmd,
|
||||||
sample_cmd,
|
|
||||||
capture_output=True,
|
capture_output=True,
|
||||||
text=True,
|
text=True,
|
||||||
timeout=60 # Add timeout
|
timeout=60 # Add timeout
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ from pathlib import Path
|
|||||||
|
|
||||||
from utils.exceptions import VideoVerificationError
|
from utils.exceptions import VideoVerificationError
|
||||||
from utils.file_deletion import SecureFileDeleter
|
from utils.file_deletion import SecureFileDeleter
|
||||||
|
from security import safe_command
|
||||||
|
|
||||||
logger = logging.getLogger("VideoArchiver")
|
logger = logging.getLogger("VideoArchiver")
|
||||||
|
|
||||||
@@ -64,8 +65,7 @@ class FileOperations:
|
|||||||
file_path,
|
file_path,
|
||||||
]
|
]
|
||||||
|
|
||||||
result = subprocess.run(
|
result = safe_command.run(subprocess.run, cmd,
|
||||||
cmd,
|
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.PIPE,
|
stderr=subprocess.PIPE,
|
||||||
text=True,
|
text=True,
|
||||||
@@ -120,7 +120,7 @@ class FileOperations:
|
|||||||
"-show_format",
|
"-show_format",
|
||||||
file_path,
|
file_path,
|
||||||
]
|
]
|
||||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
result = safe_command.run(subprocess.run, cmd, capture_output=True, text=True)
|
||||||
if result.returncode != 0:
|
if result.returncode != 0:
|
||||||
raise Exception(f"FFprobe failed: {result.stderr}")
|
raise Exception(f"FFprobe failed: {result.stderr}")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user