mirror of
https://github.com/thewesker/Video-Archive-Discord-Bot.git
synced 2025-12-19 20:01:15 -05:00
Add validation to only download for supported domains
This commit is contained in:
22
main.py
22
main.py
@@ -4,6 +4,7 @@ import ffmpeg
|
|||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from downloader import download
|
from downloader import download
|
||||||
from compressionMessages import getCompressionMessage
|
from compressionMessages import getCompressionMessage
|
||||||
|
from validator import extractUrl, isSupportedUrl
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
@@ -58,10 +59,29 @@ async def on_message(message):
|
|||||||
if message.content.startswith('$hello'):
|
if message.content.startswith('$hello'):
|
||||||
await message.channel.send('Hello!')
|
await message.channel.send('Hello!')
|
||||||
|
|
||||||
|
# Extract and validate the request
|
||||||
|
extractResponse = extractUrl(message.content)
|
||||||
|
url = extractResponse["url"]
|
||||||
|
messages = extractResponse['messages']
|
||||||
|
if(messages.startswith("Error")):
|
||||||
|
await message.channel.send('TikBot encountered an error determing a URL. Consider berating my human if this was not expected.\nMessage: ' + messages)
|
||||||
|
return
|
||||||
|
|
||||||
|
print("Got URL: " + url + " For User: " + str(message.author))
|
||||||
|
if('🤖' not in message.content):
|
||||||
|
# Validate unless we've been reqeuested not to
|
||||||
|
validateResponse = isSupportedUrl(url)
|
||||||
|
messages = validateResponse['messages']
|
||||||
|
if(messages.startswith("Error")):
|
||||||
|
await message.channel.send('TikBot encountered an error validating the URL. Consider berating my human if this was not expected.\nMessage: ' + messages)
|
||||||
|
return
|
||||||
|
if(validateResponse['supported'] == 'false'):
|
||||||
|
# Unsupported URL, return silently without doing anything
|
||||||
|
return
|
||||||
|
|
||||||
if message.content.startswith('https'):
|
if message.content.startswith('https'):
|
||||||
await message.channel.send('TikBot downloading video now!')
|
await message.channel.send('TikBot downloading video now!')
|
||||||
downloadResponse = download(message.content)
|
downloadResponse = download(url)
|
||||||
fileName = downloadResponse['fileName']
|
fileName = downloadResponse['fileName']
|
||||||
duration = downloadResponse['duration']
|
duration = downloadResponse['duration']
|
||||||
messages = downloadResponse['messages']
|
messages = downloadResponse['messages']
|
||||||
|
|||||||
17
tests.py
Normal file
17
tests.py
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
import unittest
|
||||||
|
from validator import extractUrl, isSupportedUrl
|
||||||
|
|
||||||
|
class TestUrlParser(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_supportedUrl(self):
|
||||||
|
url = "https://vm.tiktok.com/ZSJrgyXdt/"
|
||||||
|
supportedResponse = isSupportedUrl(url)
|
||||||
|
self.assertEqual(supportedResponse["supported"], 'true')
|
||||||
|
|
||||||
|
def test_unsupportedUrl(self):
|
||||||
|
url = "https://www.twitch.tv/robcdee/clip/AgileLivelyCucumberPartyTime"
|
||||||
|
supportedResponse = isSupportedUrl(url)
|
||||||
|
self.assertEqual(supportedResponse["supported"], 'false')
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
||||||
31
validator.py
Normal file
31
validator.py
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
import re
|
||||||
|
|
||||||
|
def extractUrl(inputString):
|
||||||
|
response = {'url': '', 'messages': ''}
|
||||||
|
|
||||||
|
# Grubers regex pattern https://gist.github.com/gruber/8891611
|
||||||
|
pattern = r"""(?i)\b((?:https?:(?:/{1,3}|[a-z0-9%])|[a-z0-9.\-]+[.](?:com|net|org|edu|gov|mil|aero|asia|biz|cat|coop|info|int|jobs|mobi|museum|name|post|pro|tel|travel|xxx|ac|ad|ae|af|ag|ai|al|am|an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cs|cu|cv|cx|cy|cz|dd|de|dj|dk|dm|do|dz|ec|ee|eg|eh|er|es|et|eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|Ja|sk|sl|sm|sn|so|sr|ss|st|su|sv|sx|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|zw)/)(?:[^\s()<>{}\[\]]+|\([^\s()]*?\([^\s()]+\)[^\s()]*?\)|\([^\s]+?\))+(?:\([^\s()]*?\([^\s()]+\)[^\s()]*?\)|\([^\s]+?\)|[^\s`!()\[\]{};:'".,<>?«»“”‘’])|(?:(?<!@)[a-z0-9]+(?:[.\-][a-z0-9]+)*[.](?:com|net|org|edu|gov|mil|aero|asia|biz|cat|coop|info|int|jobs|mobi|museum|name|post|pro|tel|travel|xxx|ac|ad|ae|af|ag|ai|al|am|an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cs|cu|cv|cx|cy|cz|dd|de|dj|dk|dm|do|dz|ec|ee|eg|eh|er|es|et|eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|Ja|sk|sl|sm|sn|so|sr|ss|st|su|sv|sx|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|zw)\b/?(?!@)))"""
|
||||||
|
|
||||||
|
firstUrl = re.search(pattern, inputString).group(0)
|
||||||
|
if(firstUrl != ''):
|
||||||
|
response['url'] = firstUrl
|
||||||
|
else:
|
||||||
|
response['messages'] = "Information: Unable to find a valid URL in your input. Please supply a URL starting with http."
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
def isSupportedUrl(url):
|
||||||
|
response = {'url': '', 'supported': 'false', 'messages': ''}
|
||||||
|
|
||||||
|
supportedDomains = ['youtube', 'tiktok', 'instagram']
|
||||||
|
|
||||||
|
for domain in supportedDomains:
|
||||||
|
if(domain in url):
|
||||||
|
response['supported'] = 'true'
|
||||||
|
return response
|
||||||
|
|
||||||
|
# We only reach here if the URL isn't supported
|
||||||
|
response['messages'] = "Information: Supplied URL is not a supported domain. To force TikBot to attempt it anyway, include a '🤖' in your message."
|
||||||
|
print(response['messages'])
|
||||||
|
return response
|
||||||
|
|
||||||
Reference in New Issue
Block a user