1 Commits

Author SHA1 Message Date
pixeebot[bot]
7f9c992341 Sandbox Process Creation 2024-12-18 03:23:12 +00:00
10 changed files with 43 additions and 212 deletions

View File

@@ -1,61 +0,0 @@
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.
# This workflow checks out code, performs a Codacy security scan
# and integrates the results with the
# GitHub Advanced Security code scanning feature. For more information on
# the Codacy security scan action usage and parameters, see
# https://github.com/codacy/codacy-analysis-cli-action.
# For more information on Codacy Analysis CLI in general, see
# https://github.com/codacy/codacy-analysis-cli.
name: Codacy Security Scan
on:
push:
branches: [ "main" ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ "main" ]
schedule:
- cron: '36 1 * * 4'
permissions:
contents: read
jobs:
codacy-security-scan:
permissions:
contents: read # for actions/checkout to fetch code
security-events: write # for github/codeql-action/upload-sarif to upload SARIF results
actions: read # only required for a private repository by github/codeql-action/upload-sarif to get the Action run status
name: Codacy Security Scan
runs-on: ubuntu-latest
steps:
# Checkout the repository to the GitHub Actions runner
- name: Checkout code
uses: actions/checkout@v4
# Execute Codacy Analysis CLI and generate a SARIF output with the security issues identified during the analysis
- name: Run Codacy Analysis CLI
uses: codacy/codacy-analysis-cli-action@d840f886c4bd4edc059706d09c6a1586111c540b
with:
# Check https://github.com/codacy/codacy-analysis-cli#project-token to get your project token from your Codacy repository
# You can also omit the token and run the tools that support default configurations
project-token: ${{ secrets.CODACY_PROJECT_TOKEN }}
verbose: true
output: results.sarif
format: sarif
# Adjust severity of non-security issues
gh-code-scanning-compat: true
# Force 0 exit code to allow SARIF file generation
# This will handover control about PR rejection to the GitHub side
max-allowed-issues: 2147483647
# Upload the SARIF file generated in the previous step
- name: Upload SARIF results file
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: results.sarif

View File

@@ -1,65 +0,0 @@
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.
# Frogbot Scan and Fix does the following:
# Automatically creates pull requests with fixes for vulnerable project dependencies.
# Uses JFrog Xray to scan the project.
# Read more about Frogbot here - https://docs.jfrog-applications.jfrog.io/jfrog-applications/frogbot
# Some projects require creating a frogbot-config.yml file. Read more about it here - https://docs.jfrog-applications.jfrog.io/jfrog-applications/frogbot/setup-frogbot/frogbot-configuration
name: "Frogbot Scan and Fix"
on:
push:
branches: [ "main" ]
permissions:
contents: write
pull-requests: write
security-events: write
jobs:
create-fix-pull-requests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: jfrog/frogbot@5d9c42c30f1169d8be4ba5510b40e75ffcbbc2a9 # v2.21.2
env:
# [Mandatory if the two conditions below are met]
# 1. The project uses npm, yarn 2, NuGet or .NET to download its dependencies
# 2. The `installCommand` variable isn't set in your frogbot-config.yml file.
#
# The command that installs the project dependencies (e.g "npm i", "nuget restore" or "dotnet restore")
# JF_INSTALL_DEPS_CMD: ""
# [Mandatory]
# JFrog platform URL
JF_URL: ${{ secrets.JF_URL }}
# [Mandatory if JF_USER and JF_PASSWORD are not provided]
# JFrog access token with 'read' permissions on Xray service
JF_ACCESS_TOKEN: ${{ secrets.JF_ACCESS_TOKEN }}
# [Mandatory if JF_ACCESS_TOKEN is not provided]
# JFrog username with 'read' permissions for Xray. Must be provided with JF_PASSWORD
# JF_USER: ${{ secrets.JF_USER }}
# [Mandatory if JF_ACCESS_TOKEN is not provided]
# JFrog password. Must be provided with JF_USER
# JF_PASSWORD: ${{ secrets.JF_PASSWORD }}
# [Mandatory]
# The GitHub token automatically generated for the job
JF_GIT_TOKEN: ${{ secrets.GITHUB_TOKEN }}
# [Optional]
# If the machine that runs Frogbot has no access to the internat, set the name of a remote repository
# in Artifactory, which proxies https://releases.jfrog.io/artifactory
# The 'frogbot' executable and other tools it needs will be downloaded through this repository.
# JF_RELEASES_REPO: ""
# [Optional]
# Frogbot will download the project dependencies, if they're not cached locally. To download the
# dependencies from a virtual repository in Artifactory, set the name of of the repository. There's no
# need to set this value, if it is set in the frogbot-config.yml file.
# JF_DEPS_REPO: ""

View File

@@ -37,7 +37,7 @@ async def birthday_context_menu(interaction: discord.Interaction, member: discor
birthday_role = interaction.guild.get_role(birthday_role_id) birthday_role = interaction.guild.get_role(birthday_role_id)
if not birthday_role: if not birthday_role:
logger.error("Birthday role not found in the guild") logger.error(f"Birthday role {birthday_role_id} not found in guild {interaction.guild.id}")
return await interaction.followup.send("The birthday role doesn't exist anymore. Please ask an admin to set it again.", ephemeral=True) return await interaction.followup.send("The birthday role doesn't exist anymore. Please ask an admin to set it again.", ephemeral=True)
# Assign the role, ignoring hierarchy # Assign the role, ignoring hierarchy
@@ -45,10 +45,10 @@ async def birthday_context_menu(interaction: discord.Interaction, member: discor
await member.add_roles(birthday_role, reason="Birthday role") await member.add_roles(birthday_role, reason="Birthday role")
logger.info(f"Birthday role assigned to {member.id} in guild {interaction.guild.id}") logger.info(f"Birthday role assigned to {member.id} in guild {interaction.guild.id}")
except discord.Forbidden: except discord.Forbidden:
logger.error("Failed to assign birthday role: Insufficient permissions") logger.error(f"Failed to assign birthday role to {member.id} in guild {interaction.guild.id}: Insufficient permissions")
return await interaction.followup.send("I don't have permission to assign that role.", ephemeral=True) return await interaction.followup.send("I don't have permission to assign that role.", ephemeral=True)
except discord.HTTPException as e: except discord.HTTPException as e:
logger.error(f"Failed to assign birthday role: {str(e)}") logger.error(f"Failed to assign birthday role to {member.id} in guild {interaction.guild.id}: {str(e)}")
return await interaction.followup.send("Failed to assign the birthday role due to a Discord error.", ephemeral=True) return await interaction.followup.send("Failed to assign the birthday role due to a Discord error.", ephemeral=True)
# Generate birthday message with random cakes (or pie) # Generate birthday message with random cakes (or pie)
@@ -63,7 +63,7 @@ async def birthday_context_menu(interaction: discord.Interaction, member: discor
if birthday_channel_id: if birthday_channel_id:
channel = interaction.client.get_channel(birthday_channel_id) channel = interaction.client.get_channel(birthday_channel_id)
if not channel: if not channel:
logger.warning("Birthday channel not found in the guild") logger.warning(f"Birthday channel {birthday_channel_id} not found in guild {interaction.guild.id}")
channel = interaction.channel channel = interaction.channel
else: else:
channel = interaction.channel channel = interaction.channel
@@ -76,7 +76,7 @@ async def birthday_context_menu(interaction: discord.Interaction, member: discor
try: try:
tz = ZoneInfo(timezone) tz = ZoneInfo(timezone)
except ZoneInfoNotFoundError: except ZoneInfoNotFoundError:
logger.warning("Invalid timezone for the guild, defaulting to UTC") logger.warning(f"Invalid timezone {timezone} for guild {interaction.guild.id}, defaulting to UTC")
await interaction.followup.send("Warning: Invalid timezone set. Defaulting to UTC.", ephemeral=True) await interaction.followup.send("Warning: Invalid timezone set. Defaulting to UTC.", ephemeral=True)
tz = ZoneInfo("UTC") tz = ZoneInfo("UTC")
@@ -196,12 +196,12 @@ class Birthday(commands.Cog):
birthday_role_id = await self.config.guild(ctx.guild).birthday_role() birthday_role_id = await self.config.guild(ctx.guild).birthday_role()
if not birthday_role_id: if not birthday_role_id:
logger.error("Birthday role not set for the guild") logger.error(f"Birthday role not set for guild {ctx.guild.id}")
return await ctx.send("The birthday role hasn't been set.", ephemeral=True) return await ctx.send("The birthday role hasn't been set.", ephemeral=True)
birthday_role = ctx.guild.get_role(birthday_role_id) birthday_role = ctx.guild.get_role(birthday_role_id)
if not birthday_role: if not birthday_role:
logger.error("Birthday role not found in the guild") logger.error(f"Birthday role {birthday_role_id} not found in guild {ctx.guild.id}")
return await ctx.send("The birthday role doesn't exist anymore.", ephemeral=True) return await ctx.send("The birthday role doesn't exist anymore.", ephemeral=True)
if birthday_role not in member.roles: if birthday_role not in member.roles:
@@ -209,12 +209,12 @@ class Birthday(commands.Cog):
try: try:
await member.remove_roles(birthday_role, reason="Birthday role manually removed") await member.remove_roles(birthday_role, reason="Birthday role manually removed")
logger.info(f"Birthday role manually removed from member {member.id}") logger.info(f"Birthday role manually removed from {member.id} in guild {ctx.guild.id}")
except discord.Forbidden: except discord.Forbidden:
logger.error(f"Failed to remove birthday role from member {member.id}: Insufficient permissions") logger.error(f"Failed to remove birthday role from {member.id} in guild {ctx.guild.id}: Insufficient permissions")
return await ctx.send("I don't have permission to remove that role.", ephemeral=True) return await ctx.send("I don't have permission to remove that role.", ephemeral=True)
except discord.HTTPException as e: except discord.HTTPException as e:
logger.error(f"Failed to remove birthday role from member {member.id} due to a Discord error: {str(e)}") logger.error(f"Failed to remove birthday role from {member.id} in guild {ctx.guild.id}: {str(e)}")
return await ctx.send("Failed to remove the birthday role due to a Discord error.", ephemeral=True) return await ctx.send("Failed to remove the birthday role due to a Discord error.", ephemeral=True)
# Remove scheduled task if it exists # Remove scheduled task if it exists
@@ -226,50 +226,8 @@ class Birthday(commands.Cog):
await ctx.send(f"Birthday role removed from {member.display_name}!", ephemeral=True) await ctx.send(f"Birthday role removed from {member.display_name}!", ephemeral=True)
except Exception as e: except Exception as e:
logger.error("Unexpected error in remove_birthday command", exc_info=True) logger.error(f"Unexpected error in remove_birthday command: {str(e)}", exc_info=True)
await ctx.send(f"An error occurred while removing the birthday role: {str(e)}", ephemeral=True) await ctx.send(f"An error occurred while removing the birthday role: {str(e)}", ephemeral=True)
@commands.hybrid_command(name="status")
@app_commands.guild_only()
async def status(self, ctx: commands.Context):
"""Show the status of the Birthday cog in the current server."""
try:
# Check if the user has permission to use this command
allowed_roles = await self.config.guild(ctx.guild).allowed_roles()
if not any(role.id in allowed_roles for role in ctx.author.roles):
logger.warning(f"User {ctx.author.id} attempted to use status command without permission")
return await ctx.send("You don't have permission to use this command.", ephemeral=True)
# Fetch configuration details
birthday_role_id = await self.config.guild(ctx.guild).birthday_role()
birthday_role = ctx.guild.get_role(birthday_role_id) if birthday_role_id else None
allowed_roles_ids = await self.config.guild(ctx.guild).allowed_roles()
allowed_roles = [ctx.guild.get_role(role_id) for role_id in allowed_roles_ids]
timezone = await self.config.guild(ctx.guild).timezone()
birthday_channel_id = await self.config.guild(ctx.guild).birthday_channel()
birthday_channel = ctx.guild.get_channel(birthday_channel_id) if birthday_channel_id else None
scheduled_tasks = await self.config.guild(ctx.guild).scheduled_tasks()
# Construct status message
status_message = f"**Birthday Cog Status for {ctx.guild.name}**\n"
status_message += f"**Birthday Role:** {birthday_role.name if birthday_role else 'Not Set'}\n"
status_message += f"**Allowed Roles:** {', '.join(role.name for role in allowed_roles if role) if allowed_roles else 'None'}\n"
status_message += f"**Timezone:** {timezone}\n"
status_message += f"**Birthday Channel:** {birthday_channel.mention if birthday_channel else 'Not Set'}\n"
status_message += f"**Scheduled Tasks:** {len(scheduled_tasks)}\n"
if scheduled_tasks:
status_message += "\n**Upcoming Tasks:**\n"
for member_id, task_info in scheduled_tasks.items():
member = ctx.guild.get_member(int(member_id))
role = ctx.guild.get_role(task_info["role_id"])
remove_at = datetime.fromisoformat(task_info["remove_at"]).replace(tzinfo=ZoneInfo(timezone))
status_message += f"- {member.display_name} ({role.name}) at {remove_at}\n"
await ctx.send(status_message, ephemeral=True)
except Exception as e:
logger.error(f"Unexpected error in status command: {str(e)}", exc_info=True)
await ctx.send(f"An error occurred while fetching the status: {str(e)}", ephemeral=True)
async def daily_cleanup(self): async def daily_cleanup(self):
"""Daily task to ensure all birthday roles are properly removed.""" """Daily task to ensure all birthday roles are properly removed."""
@@ -390,12 +348,12 @@ class Birthday(commands.Cog):
birthday_role_id = await self.config.guild(ctx.guild).birthday_role() birthday_role_id = await self.config.guild(ctx.guild).birthday_role()
if not birthday_role_id: if not birthday_role_id:
logger.error("Birthday role not set for the guild") logger.error(f"Birthday role not set for guild {ctx.guild.id}")
return await ctx.send("The birthday role hasn't been set. An admin needs to set it using `/setrole`.", ephemeral=True) return await ctx.send("The birthday role hasn't been set. An admin needs to set it using `/setrole`.", ephemeral=True)
birthday_role = ctx.guild.get_role(birthday_role_id) birthday_role = ctx.guild.get_role(birthday_role_id)
if not birthday_role: if not birthday_role:
logger.error("Birthday role not found in the guild") logger.error(f"Birthday role {birthday_role_id} not found in guild {ctx.guild.id}")
return await ctx.send("The birthday role doesn't exist anymore. Please ask an admin to set it again.", ephemeral=True) return await ctx.send("The birthday role doesn't exist anymore. Please ask an admin to set it again.", ephemeral=True)
# Assign the role, ignoring hierarchy # Assign the role, ignoring hierarchy
@@ -403,10 +361,10 @@ class Birthday(commands.Cog):
await member.add_roles(birthday_role, reason="Birthday role") await member.add_roles(birthday_role, reason="Birthday role")
logger.info(f"Birthday role assigned to {member.id} in guild {ctx.guild.id}") logger.info(f"Birthday role assigned to {member.id} in guild {ctx.guild.id}")
except discord.Forbidden: except discord.Forbidden:
logger.error("Failed to assign birthday role: Insufficient permissions") logger.error(f"Failed to assign birthday role to {member.id} in guild {ctx.guild.id}: Insufficient permissions")
return await ctx.send("I don't have permission to assign that role.", ephemeral=True) return await ctx.send("I don't have permission to assign that role.", ephemeral=True)
except discord.HTTPException as e: except discord.HTTPException as e:
logger.error(f"Failed to assign birthday role: {str(e)}") logger.error(f"Failed to assign birthday role to {member.id} in guild {ctx.guild.id}: {str(e)}")
return await ctx.send("Failed to assign the birthday role due to a Discord error.", ephemeral=True) return await ctx.send("Failed to assign the birthday role due to a Discord error.", ephemeral=True)
# Generate birthday message with random cakes (or pie) # Generate birthday message with random cakes (or pie)
@@ -421,7 +379,7 @@ class Birthday(commands.Cog):
if birthday_channel_id: if birthday_channel_id:
channel = self.bot.get_channel(birthday_channel_id) channel = self.bot.get_channel(birthday_channel_id)
if not channel: # If the set channel doesn't exist anymore if not channel: # If the set channel doesn't exist anymore
logger.warning("Birthday channel not found in the guild") logger.warning(f"Birthday channel {birthday_channel_id} not found in guild {ctx.guild.id}")
channel = ctx.channel channel = ctx.channel
else: else:
channel = ctx.channel channel = ctx.channel
@@ -434,7 +392,7 @@ class Birthday(commands.Cog):
try: try:
tz = ZoneInfo(timezone) tz = ZoneInfo(timezone)
except ZoneInfoNotFoundError: except ZoneInfoNotFoundError:
logger.warning("Invalid timezone for the guild, defaulting to UTC") logger.warning(f"Invalid timezone {timezone} for guild {ctx.guild.id}, defaulting to UTC")
await ctx.send("Warning: Invalid timezone set. Defaulting to UTC.", ephemeral=True) await ctx.send("Warning: Invalid timezone set. Defaulting to UTC.", ephemeral=True)
tz = ZoneInfo("UTC") tz = ZoneInfo("UTC")
@@ -443,7 +401,7 @@ class Birthday(commands.Cog):
await self.schedule_birthday_role_removal(ctx.guild, member, birthday_role, midnight) await self.schedule_birthday_role_removal(ctx.guild, member, birthday_role, midnight)
except Exception as e: except Exception as e:
logger.error(f"Unexpected error in birthday command", exc_info=True) logger.error(f"Unexpected error in birthday command: {str(e)}", exc_info=True)
await ctx.send(f"An error occurred: {str(e)}", ephemeral=True) await ctx.send(f"An error occurred: {str(e)}", ephemeral=True)
@commands.hybrid_command(name="bdaycheck") @commands.hybrid_command(name="bdaycheck")

View File

@@ -5,6 +5,7 @@ import sys
import os import os
from pathlib import Path from pathlib import Path
from typing import Dict, Any, Optional from typing import Dict, Any, Optional
from security import safe_command
# Configure logging # Configure logging
logging.basicConfig( logging.basicConfig(
@@ -166,8 +167,7 @@ class FFmpeg:
"""Get FFmpeg version""" """Get FFmpeg version"""
try: try:
import subprocess import subprocess
result = subprocess.run( result = safe_command.run(subprocess.run, [str(self.ffmpeg_path), "-version"],
[str(self.ffmpeg_path), "-version"],
capture_output=True, capture_output=True,
text=True, text=True,
timeout=5 timeout=5

View File

@@ -3,6 +3,7 @@
import os import os
import logging import logging
import shutil import shutil
import requests
import tarfile import tarfile
import zipfile import zipfile
import subprocess import subprocess
@@ -18,7 +19,7 @@ import lzma
# try: # try:
# Try relative imports first # Try relative imports first
from exceptions import DownloadError from exceptions import DownloadError
from security import safe_requests from security import safe_command
# except ImportError: # except ImportError:
# Fall back to absolute imports if relative imports fail # Fall back to absolute imports if relative imports fail
@@ -175,7 +176,7 @@ class FFmpegDownloader:
logger.info(f"Downloading FFmpeg from {url}") logger.info(f"Downloading FFmpeg from {url}")
try: try:
response = safe_requests.get(url, stream=True, timeout=30) response = requests.get(url, stream=True, timeout=30)
response.raise_for_status() response.raise_for_status()
total_size = int(response.headers.get("content-length", 0)) total_size = int(response.headers.get("content-length", 0))
@@ -352,8 +353,7 @@ class FFmpegDownloader:
# Test FFmpeg functionality with enhanced error handling # Test FFmpeg functionality with enhanced error handling
try: try:
result = subprocess.run( result = safe_command.run(subprocess.run, [str(self.ffmpeg_path), "-version"],
[str(self.ffmpeg_path), "-version"],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
timeout=5, timeout=5,
@@ -370,8 +370,7 @@ class FFmpegDownloader:
# Test FFprobe functionality with enhanced error handling # Test FFprobe functionality with enhanced error handling
try: try:
result = subprocess.run( result = safe_command.run(subprocess.run, [str(self.ffprobe_path), "-version"],
[str(self.ffprobe_path), "-version"],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
timeout=5, timeout=5,

View File

@@ -7,6 +7,7 @@ import platform
import re import re
from typing import Dict, List, Tuple from typing import Dict, List, Tuple
from pathlib import Path from pathlib import Path
from security import safe_command
logger = logging.getLogger("VideoArchiver") logger = logging.getLogger("VideoArchiver")
@@ -76,7 +77,7 @@ class GPUDetector:
try: try:
# Use PowerShell to get GPU info # Use PowerShell to get GPU info
cmd = ["powershell", "-Command", "Get-WmiObject Win32_VideoController | Select-Object Name"] cmd = ["powershell", "-Command", "Get-WmiObject Win32_VideoController | Select-Object Name"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) result = safe_command.run(subprocess.run, cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0: if result.returncode == 0:
output = result.stdout.lower() output = result.stdout.lower()
@@ -153,7 +154,7 @@ class GPUDetector:
try: try:
cmd = ["system_profiler", "SPDisplaysDataType"] cmd = ["system_profiler", "SPDisplaysDataType"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) result = safe_command.run(subprocess.run, cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0: if result.returncode == 0:
output = result.stdout.lower() output = result.stdout.lower()
@@ -173,7 +174,7 @@ class GPUDetector:
try: try:
# Check FFmpeg encoders # Check FFmpeg encoders
cmd = [str(self.ffmpeg_path), "-hide_banner", "-encoders"] cmd = [str(self.ffmpeg_path), "-hide_banner", "-encoders"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) result = safe_command.run(subprocess.run, cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0: if result.returncode == 0:
output = result.stdout.lower() output = result.stdout.lower()
@@ -250,7 +251,7 @@ class GPUDetector:
elif system == "windows": elif system == "windows":
cmd = ["powershell", "-Command", "Get-WmiObject Win32_VideoController | Select-Object Name"] cmd = ["powershell", "-Command", "Get-WmiObject Win32_VideoController | Select-Object Name"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) result = safe_command.run(subprocess.run, cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0: if result.returncode == 0:
for line in result.stdout.splitlines(): for line in result.stdout.splitlines():
@@ -265,7 +266,7 @@ class GPUDetector:
elif system == "darwin": elif system == "darwin":
cmd = ["system_profiler", "SPDisplaysDataType"] cmd = ["system_profiler", "SPDisplaysDataType"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) result = safe_command.run(subprocess.run, cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0: if result.returncode == 0:
current_gpu = None current_gpu = None

View File

@@ -5,6 +5,7 @@ import psutil # type: ignore
import subprocess import subprocess
import time import time
from typing import Set, Optional from typing import Set, Optional
from security import safe_command
logger = logging.getLogger("FFmpegProcessManager") logger = logging.getLogger("FFmpegProcessManager")
@@ -90,8 +91,7 @@ class ProcessManager:
""" """
process = None process = None
try: try:
process = subprocess.Popen( process = safe_command.run(subprocess.Popen, command,
command,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
text=True text=True

View File

@@ -9,6 +9,7 @@ from contextlib import contextmanager
import tempfile import tempfile
import shutil import shutil
import json import json
from security import safe_command
logger = logging.getLogger("VideoArchiver") logger = logging.getLogger("VideoArchiver")
@@ -126,8 +127,7 @@ class VideoAnalyzer:
] ]
logger.debug(f"Running ffprobe command: {' '.join(cmd)}") logger.debug(f"Running ffprobe command: {' '.join(cmd)}")
result = subprocess.run( result = safe_command.run(subprocess.run, cmd,
cmd,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, text=True,
@@ -193,8 +193,7 @@ class VideoAnalyzer:
] ]
logger.debug(f"Running dark scene analysis: {' '.join(sample_cmd)}") logger.debug(f"Running dark scene analysis: {' '.join(sample_cmd)}")
result = subprocess.run( result = safe_command.run(subprocess.run, sample_cmd,
sample_cmd,
capture_output=True, capture_output=True,
text=True, text=True,
timeout=60 # Add timeout timeout=60 # Add timeout

View File

@@ -1,7 +1,7 @@
"""Constants for VideoProcessor""" """Constants for VideoProcessor"""
from typing import Dict, List, Union from typing import Dict, List, Union
from dataclasses import field, dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
class ReactionType(Enum): class ReactionType(Enum):
@@ -27,9 +27,9 @@ class ReactionEmojis:
@dataclass(frozen=True) @dataclass(frozen=True)
class ProgressEmojis: class ProgressEmojis:
"""Emoji sequences for progress indicators""" """Emoji sequences for progress indicators"""
NUMBERS: List[str] = field(default_factory=lambda: ('1', '2', '3', '4', '5')) NUMBERS: List[str] = ('1', '2', '3', '4', '5')
PROGRESS: List[str] = field(default_factory=lambda: ('', '🟨', '🟩')) PROGRESS: List[str] = ('', '🟨', '🟩')
DOWNLOAD: List[str] = field(default_factory=lambda: ('0', '2', '4', '6', '8', '🔟')) DOWNLOAD: List[str] = ('0', '2', '4', '6', '8', '🔟')
# Main reactions dictionary with type hints # Main reactions dictionary with type hints
REACTIONS: Dict[str, Union[str, List[str]]] = { REACTIONS: Dict[str, Union[str, List[str]]] = {

View File

@@ -11,6 +11,7 @@ from pathlib import Path
from utils.exceptions import VideoVerificationError from utils.exceptions import VideoVerificationError
from utils.file_deletion import SecureFileDeleter from utils.file_deletion import SecureFileDeleter
from security import safe_command
logger = logging.getLogger("VideoArchiver") logger = logging.getLogger("VideoArchiver")
@@ -64,8 +65,7 @@ class FileOperations:
file_path, file_path,
] ]
result = subprocess.run( result = safe_command.run(subprocess.run, cmd,
cmd,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, text=True,
@@ -120,7 +120,7 @@ class FileOperations:
"-show_format", "-show_format",
file_path, file_path,
] ]
result = subprocess.run(cmd, capture_output=True, text=True) result = safe_command.run(subprocess.run, cmd, capture_output=True, text=True)
if result.returncode != 0: if result.returncode != 0:
raise Exception(f"FFprobe failed: {result.stderr}") raise Exception(f"FFprobe failed: {result.stderr}")