From 9dfb00f4abaf652cf01c42908d78f6f319d5cb54 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 3 Sep 2024 15:56:29 +1000 Subject: [PATCH] chat_archiver: Logic for checking and downloading media links --- chat_archiver/chat_archiver/main.py | 80 +++++++++++++++++++++++++++-- common/common/media.py | 8 ++- docker-compose.jsonnet | 8 ++- 3 files changed, 91 insertions(+), 5 deletions(-) diff --git a/chat_archiver/chat_archiver/main.py b/chat_archiver/chat_archiver/main.py index ff922b7..7d4d051 100644 --- a/chat_archiver/chat_archiver/main.py +++ b/chat_archiver/chat_archiver/main.py @@ -5,6 +5,7 @@ import json import logging import os import random +import re import string import signal import socket @@ -19,6 +20,7 @@ import gevent.queue from common import atomic_write, listdir from common.chat import BATCH_INTERVAL, format_batch, get_batch_files, merge_messages +from common.media import download_media, FailedResponse, WrongContent, Rejected from girc import Client from monotonic import monotonic @@ -115,12 +117,13 @@ merge_pass_merges = prom.Histogram( ) class Archiver(object): - def __init__(self, name, base_dir, channels, nick, oauth_token): + def __init__(self, name, base_dir, channels, nick, oauth_token, download_media): self.logger = logging.getLogger(type(self).__name__).getChild(name) self.name = name self.messages = gevent.queue.Queue() self.channels = channels self.base_dir = base_dir + self.download_media = download_media self.stopping = gevent.event.Event() self.got_reconnect = gevent.event.Event() @@ -252,6 +255,9 @@ class Archiver(object): emote_ids = [emote_spec.split(':')[0] for emote_spec in emote_specs] ensure_emotes(self.base_dir, emote_ids) + if self.download_media and data['command'] == "PRIVMSG" and len(data["params"]) == 2: + ensure_image_links(self.base_dir, data["params"][1]) + if data['tags'] and 'tmi-sent-ts' in data['tags']: # explicit server time is available timestamp = int(data['tags']['tmi-sent-ts']) / 1000. # original is int ms @@ -344,6 +350,74 @@ def wait_for_ensure_emotes(): _EMOTES_RUNNING.wait() +URL_REGEX = re.compile(r""" + # Previous char is not a letter. This prevents eg. "foohttp://example.com" + # Also disallows / as the previous character, otherwise "file:///foo.bar/baz" + # can match on the "foo.bar/baz" part. + (? https?:// )? + # Hostname, which must contain a dot. Single-part hostnames like "localhost" are valid + # but we don't want to match them, and this avoids cases like "yes/no" matching. + # We enforce that the TLD is not fully numeric. No TLDs currently look like this + # (though it does end up forbidding raw IPv4 addresses), and a common false-positive + # is "1.5/10" or similar. + ( [a-z0-9-]+ \. )+ [a-z][a-z0-9-]+ + # Optional port + ( : [0-9]+ )? + # Optional path. We assume a path character can be anything that's not completely disallowed + # but don't try to parse it further into query, fragment etc. + # We also include all unicode characters considered "letters" since it's likely someone might + # put a รถ or something in a path and copy-paste it from their browser URL bar which renders it + # like that even though it's encoded when actually sent as a URL. + # Restricting this to letters prevents things like non-breaking spaces causing problems. + # For the same reason we also allow {} and [] which seem to show up often in paths. + (?P / (\w | [!#$%&'()*+,./:;=?@_~{}-] | \[ | \] )* )? +""", re.ASCII | re.VERBOSE | re.IGNORECASE) + + +_IMAGE_LINKS_RUNNING = KeyedGroup() +def ensure_image_links(base_dir, text): + """Find any image or video links in the text and download them if we don't have them already. + This happens in the background and errors are ignored.""" + media_dir = os.path.join(base_dir, "media") + + def get_url(url): + try: + try: + download_media(url, media_dir) + except FailedResponse: + # We got a 404 or similar. + # Attempt to remove any stray punctuation from the url and try again. + # We only try this once. + if url.endswith("..."): + url = url[:-3] + elif not url[-1].isalnum(): + url = url[:-1] + else: + # No puncuation found, let the original result stand + raise + download_media(url, media_dir) + except WrongContent as e: + logging.info(f"Ignoring non-media link {url}: {e}") + except Rejected as e: + logging.warning(f"Rejected dangerous link {url}: {e}") + except Exception: + logging.warning(f"Unable to fetch link {url}", exc_info=True) + + for match in URL_REGEX.finditer(text): + # Don't match on bare hostnames with no scheme AND no path. ie. + # http://example.com SHOULD match + # example.com/foo SHOULD match + # example.com SHOULD NOT match + # Otherwise we get a false positive every time someone says "fart.wav" or similar. + if match.group("scheme") is None and match.group("path") is None: + continue + url = match.group(0) + key = (media_dir, url) + _IMAGE_LINKS_RUNNING.spawn(key, get_url, url) + + def write_batch(path, batch_time, messages, size_histogram=None): """Batches are named PATH/YYYY-MM-DDTHH/MM:SS-HASH.json""" output = (format_batch(messages) + '\n').encode('utf-8') @@ -452,7 +526,7 @@ def merge_batch_files(path, batch_time): os.remove(batch_file) -def main(nick, oauth_token_path, *channels, base_dir='/mnt', name=None, merge_interval=60, metrics_port=8008): +def main(nick, oauth_token_path, *channels, base_dir='/mnt', name=None, merge_interval=60, metrics_port=8008, download_media=False): with open(oauth_token_path) as f: oauth_token = f.read() # To ensure uniqueness even if multiple instances are running on the same host, @@ -478,7 +552,7 @@ def main(nick, oauth_token_path, *channels, base_dir='/mnt', name=None, merge_in logging.info("Starting") for index in count(): # To ensure uniqueness between clients, include a client number - archiver = Archiver("{}.{}".format(name, index), base_dir, channels, nick, oauth_token) + archiver = Archiver("{}.{}".format(name, index), base_dir, channels, nick, oauth_token, download_media) archive_worker = gevent.spawn(archiver.run) workers = mergers + [archive_worker] # wait for either graceful exit, error, or for a signal from the archiver that a reconnect was requested diff --git a/common/common/media.py b/common/common/media.py index 86093dc..7350742 100644 --- a/common/common/media.py +++ b/common/common/media.py @@ -51,6 +51,9 @@ class BadScheme(Rejected): class WrongContent(Rejected): """Response was not a video or image""" +class FailedResponse(Rejected): + """Got a 4xx response, probably a bad link""" + def check_for_media(output_dir, url): """Returns True if we have at least one version of content for the given url already.""" @@ -219,7 +222,10 @@ def _request(url, max_size, content_types): if resp.get_redirect_location(): return resp - if resp.status != 200: + # 4xx errors are non-retryable, anything else is. + if 400 <= resp.status < 500: + raise FailedResponse(f"Url returned {resp.status} response: {url}") + elif not (200 <= resp.status < 300): raise Exception(f"Url returned {resp.status} response: {url}") content_type = resp.getheader("content-type") diff --git a/docker-compose.jsonnet b/docker-compose.jsonnet index b30a21d..a70c26d 100644 --- a/docker-compose.jsonnet +++ b/docker-compose.jsonnet @@ -220,6 +220,8 @@ token_path: "./chat_token.txt", // Whether to enable backfilling of chat archives to this node (if backfiller enabled) backfill: true, + // Whether to enable downloading of media (images, videos, etc) that is posted in chat. + download_media: true, // Channels to watch. Defaults to "all twitch channels in $.channels" but you can add extras. channels: [ std.split(c, '!')[0] @@ -614,7 +616,11 @@ [if $.enabled.chat_archiver then "chat_archiver"]: { image: $.get_image("chat_archiver"), restart: "always", - command: [$.chat_archiver.user, "/token"] + $.chat_archiver.channels + ["--name", $.localhost], + command: + [$.chat_archiver.user, "/token"] + + $.chat_archiver.channels + + ["--name", $.localhost] + + (if $.chat_archiver.download_media then ["--download-media"] else []), volumes: ["%s:/mnt" % $.segments_path, "%s:/token" % $.chat_archiver.token_path], [if "chat_archiver" in $.ports then "ports"]: ["%s:8008" % $.ports.chat_archiver], environment: $.env,