chat_archiver: Logic for checking and downloading media links

Mike Lang 2 weeks ago
parent 499739053b
commit ab6c36a319

@ -5,6 +5,7 @@ import json
import logging
import os
import random
import re
import string
import signal
import socket
@ -19,6 +20,7 @@ import gevent.queue
from common import atomic_write, listdir
from common.chat import BATCH_INTERVAL, format_batch, get_batch_files, merge_messages
from common.media import download_media, FailedResponse, WrongContent, Rejected
from girc import Client
from monotonic import monotonic
@ -115,12 +117,13 @@ merge_pass_merges = prom.Histogram(
)
class Archiver(object):
def __init__(self, name, base_dir, channels, nick, oauth_token):
def __init__(self, name, base_dir, channels, nick, oauth_token, download_media):
self.logger = logging.getLogger(type(self).__name__).getChild(name)
self.name = name
self.messages = gevent.queue.Queue()
self.channels = channels
self.base_dir = base_dir
self.download_media = download_media
self.stopping = gevent.event.Event()
self.got_reconnect = gevent.event.Event()
@ -252,6 +255,9 @@ class Archiver(object):
emote_ids = [emote_spec.split(':')[0] for emote_spec in emote_specs]
ensure_emotes(self.base_dir, emote_ids)
if self.download_media and data['command'] == "PRIVMSG" and len(data["params"]) == 2:
ensure_image_links(self.base_dir, data["params"][1])
if data['tags'] and 'tmi-sent-ts' in data['tags']:
# explicit server time is available
timestamp = int(data['tags']['tmi-sent-ts']) / 1000. # original is int ms
@ -344,6 +350,74 @@ def wait_for_ensure_emotes():
_EMOTES_RUNNING.wait()
URL_REGEX = re.compile(r"""
# Previous char is not a letter. This prevents eg. "foohttp://example.com"
# Also disallows / as the previous character, otherwise "file:///foo.bar/baz"
# can match on the "foo.bar/baz" part.
(?<! \w | / )
# optional scheme, which must be http or https (we don't want other schemes)
(?P<scheme> https?:// )?
# Hostname, which must contain a dot. Single-part hostnames like "localhost" are valid
# but we don't want to match them, and this avoids cases like "yes/no" matching.
# We enforce that the TLD is not fully numeric. No TLDs currently look like this
# (though it does end up forbidding raw IPv4 addresses), and a common false-positive
# is "1.5/10" or similar.
( [a-z0-9-]+ \. )+ [a-z][a-z0-9-]+
# Optional port
( : [0-9]+ )?
# Optional path. We assume a path character can be anything that's not completely disallowed
# but don't try to parse it further into query, fragment etc.
# We also include all unicode characters considered "letters" since it's likely someone might
# put a ö or something in a path and copy-paste it from their browser URL bar which renders it
# like that even though it's encoded when actually sent as a URL.
# Restricting this to letters prevents things like non-breaking spaces causing problems.
# For the same reason we also allow {} and [] which seem to show up often in paths.
(?P<path> / (\w | [!#$%&'()*+,./:;=?@_~{}-] | \[ | \] )* )?
""", re.ASCII | re.VERBOSE | re.IGNORECASE)
_IMAGE_LINKS_RUNNING = KeyedGroup()
def ensure_image_links(base_dir, text):
"""Find any image or video links in the text and download them if we don't have them already.
This happens in the background and errors are ignored."""
media_dir = os.path.join(base_dir, "media")
def get_url(url):
try:
try:
download_media(url, media_dir)
except FailedResponse:
# We got a 404 or similar.
# Attempt to remove any stray punctuation from the url and try again.
# We only try this once.
if url.endswith("..."):
url = url[:-3]
elif not url[-1].isalnum():
url = url[:-1]
else:
# No puncuation found, let the original result stand
raise
download_media(url, media_dir)
except WrongContent as e:
logging.info(f"Ignoring non-media link {url}: {e}")
except Rejected as e:
logging.warning(f"Rejected dangerous link {url}: {e}")
except Exception:
logging.warning(f"Unable to fetch link {url}", exc_info=True)
for match in URL_REGEX.finditer(text):
# Don't match on bare hostnames with no scheme AND no path. ie.
# http://example.com SHOULD match
# example.com/foo SHOULD match
# example.com SHOULD NOT match
# Otherwise we get a false positive every time someone says "fart.wav" or similar.
if match.group("scheme") is None and match.group("path") is None:
continue
url = match.group(0)
key = (media_dir, url)
_IMAGE_LINKS_RUNNING.spawn(key, get_url, url)
def write_batch(path, batch_time, messages, size_histogram=None):
"""Batches are named PATH/YYYY-MM-DDTHH/MM:SS-HASH.json"""
output = (format_batch(messages) + '\n').encode('utf-8')
@ -452,7 +526,7 @@ def merge_batch_files(path, batch_time):
os.remove(batch_file)
def main(nick, oauth_token_path, *channels, base_dir='/mnt', name=None, merge_interval=60, metrics_port=8008):
def main(nick, oauth_token_path, *channels, base_dir='/mnt', name=None, merge_interval=60, metrics_port=8008, download_media=False):
with open(oauth_token_path) as f:
oauth_token = f.read()
# To ensure uniqueness even if multiple instances are running on the same host,
@ -478,7 +552,7 @@ def main(nick, oauth_token_path, *channels, base_dir='/mnt', name=None, merge_in
logging.info("Starting")
for index in count():
# To ensure uniqueness between clients, include a client number
archiver = Archiver("{}.{}".format(name, index), base_dir, channels, nick, oauth_token)
archiver = Archiver("{}.{}".format(name, index), base_dir, channels, nick, oauth_token, download_media)
archive_worker = gevent.spawn(archiver.run)
workers = mergers + [archive_worker]
# wait for either graceful exit, error, or for a signal from the archiver that a reconnect was requested

@ -51,6 +51,9 @@ class BadScheme(Rejected):
class WrongContent(Rejected):
"""Response was not a video or image"""
class FailedResponse(Rejected):
"""Got a 4xx response, probably a bad link"""
def check_for_media(output_dir, url):
"""Returns True if we have at least one version of content for the given url already."""
@ -219,7 +222,10 @@ def _request(url, max_size, content_types):
if resp.get_redirect_location():
return resp
if resp.status != 200:
# 4xx errors are non-retryable, anything else is.
if 400 <= resp.status < 500:
raise FailedResponse(f"Url returned {resp.status} response: {url}")
elif not (200 <= resp.status < 300):
raise Exception(f"Url returned {resp.status} response: {url}")
content_type = resp.getheader("content-type")

@ -220,6 +220,8 @@
token_path: "./chat_token.txt",
// Whether to enable backfilling of chat archives to this node (if backfiller enabled)
backfill: true,
// Whether to enable downloading of media (images, videos, etc) that is posted in chat.
download_media: true,
// Channels to watch. Defaults to "all twitch channels in $.channels" but you can add extras.
channels: [
std.split(c, '!')[0]
@ -614,7 +616,11 @@
[if $.enabled.chat_archiver then "chat_archiver"]: {
image: $.get_image("chat_archiver"),
restart: "always",
command: [$.chat_archiver.user, "/token"] + $.chat_archiver.channels + ["--name", $.localhost],
command:
[$.chat_archiver.user, "/token"]
+ $.chat_archiver.channels
+ ["--name", $.localhost]
+ (if $.chat_archiver.download_media then ["--download-media"] else []),
volumes: ["%s:/mnt" % $.segments_path, "%s:/token" % $.chat_archiver.token_path],
[if "chat_archiver" in $.ports then "ports"]: ["%s:8008" % $.ports.chat_archiver],
environment: $.env,

Loading…
Cancel
Save