chat_archiver: Logic for checking and downloading media links

pull/408/head
Mike Lang 3 months ago committed by Mike Lang
parent 2855ec759d
commit 9dfb00f4ab

@ -5,6 +5,7 @@ import json
import logging import logging
import os import os
import random import random
import re
import string import string
import signal import signal
import socket import socket
@ -19,6 +20,7 @@ import gevent.queue
from common import atomic_write, listdir from common import atomic_write, listdir
from common.chat import BATCH_INTERVAL, format_batch, get_batch_files, merge_messages from common.chat import BATCH_INTERVAL, format_batch, get_batch_files, merge_messages
from common.media import download_media, FailedResponse, WrongContent, Rejected
from girc import Client from girc import Client
from monotonic import monotonic from monotonic import monotonic
@ -115,12 +117,13 @@ merge_pass_merges = prom.Histogram(
) )
class Archiver(object): class Archiver(object):
def __init__(self, name, base_dir, channels, nick, oauth_token): def __init__(self, name, base_dir, channels, nick, oauth_token, download_media):
self.logger = logging.getLogger(type(self).__name__).getChild(name) self.logger = logging.getLogger(type(self).__name__).getChild(name)
self.name = name self.name = name
self.messages = gevent.queue.Queue() self.messages = gevent.queue.Queue()
self.channels = channels self.channels = channels
self.base_dir = base_dir self.base_dir = base_dir
self.download_media = download_media
self.stopping = gevent.event.Event() self.stopping = gevent.event.Event()
self.got_reconnect = gevent.event.Event() self.got_reconnect = gevent.event.Event()
@ -252,6 +255,9 @@ class Archiver(object):
emote_ids = [emote_spec.split(':')[0] for emote_spec in emote_specs] emote_ids = [emote_spec.split(':')[0] for emote_spec in emote_specs]
ensure_emotes(self.base_dir, emote_ids) ensure_emotes(self.base_dir, emote_ids)
if self.download_media and data['command'] == "PRIVMSG" and len(data["params"]) == 2:
ensure_image_links(self.base_dir, data["params"][1])
if data['tags'] and 'tmi-sent-ts' in data['tags']: if data['tags'] and 'tmi-sent-ts' in data['tags']:
# explicit server time is available # explicit server time is available
timestamp = int(data['tags']['tmi-sent-ts']) / 1000. # original is int ms timestamp = int(data['tags']['tmi-sent-ts']) / 1000. # original is int ms
@ -344,6 +350,74 @@ def wait_for_ensure_emotes():
_EMOTES_RUNNING.wait() _EMOTES_RUNNING.wait()
URL_REGEX = re.compile(r"""
# Previous char is not a letter. This prevents eg. "foohttp://example.com"
# Also disallows / as the previous character, otherwise "file:///foo.bar/baz"
# can match on the "foo.bar/baz" part.
(?<! \w | / )
# optional scheme, which must be http or https (we don't want other schemes)
(?P<scheme> https?:// )?
# Hostname, which must contain a dot. Single-part hostnames like "localhost" are valid
# but we don't want to match them, and this avoids cases like "yes/no" matching.
# We enforce that the TLD is not fully numeric. No TLDs currently look like this
# (though it does end up forbidding raw IPv4 addresses), and a common false-positive
# is "1.5/10" or similar.
( [a-z0-9-]+ \. )+ [a-z][a-z0-9-]+
# Optional port
( : [0-9]+ )?
# Optional path. We assume a path character can be anything that's not completely disallowed
# but don't try to parse it further into query, fragment etc.
# We also include all unicode characters considered "letters" since it's likely someone might
# put a ö or something in a path and copy-paste it from their browser URL bar which renders it
# like that even though it's encoded when actually sent as a URL.
# Restricting this to letters prevents things like non-breaking spaces causing problems.
# For the same reason we also allow {} and [] which seem to show up often in paths.
(?P<path> / (\w | [!#$%&'()*+,./:;=?@_~{}-] | \[ | \] )* )?
""", re.ASCII | re.VERBOSE | re.IGNORECASE)
_IMAGE_LINKS_RUNNING = KeyedGroup()
def ensure_image_links(base_dir, text):
"""Find any image or video links in the text and download them if we don't have them already.
This happens in the background and errors are ignored."""
media_dir = os.path.join(base_dir, "media")
def get_url(url):
try:
try:
download_media(url, media_dir)
except FailedResponse:
# We got a 404 or similar.
# Attempt to remove any stray punctuation from the url and try again.
# We only try this once.
if url.endswith("..."):
url = url[:-3]
elif not url[-1].isalnum():
url = url[:-1]
else:
# No puncuation found, let the original result stand
raise
download_media(url, media_dir)
except WrongContent as e:
logging.info(f"Ignoring non-media link {url}: {e}")
except Rejected as e:
logging.warning(f"Rejected dangerous link {url}: {e}")
except Exception:
logging.warning(f"Unable to fetch link {url}", exc_info=True)
for match in URL_REGEX.finditer(text):
# Don't match on bare hostnames with no scheme AND no path. ie.
# http://example.com SHOULD match
# example.com/foo SHOULD match
# example.com SHOULD NOT match
# Otherwise we get a false positive every time someone says "fart.wav" or similar.
if match.group("scheme") is None and match.group("path") is None:
continue
url = match.group(0)
key = (media_dir, url)
_IMAGE_LINKS_RUNNING.spawn(key, get_url, url)
def write_batch(path, batch_time, messages, size_histogram=None): def write_batch(path, batch_time, messages, size_histogram=None):
"""Batches are named PATH/YYYY-MM-DDTHH/MM:SS-HASH.json""" """Batches are named PATH/YYYY-MM-DDTHH/MM:SS-HASH.json"""
output = (format_batch(messages) + '\n').encode('utf-8') output = (format_batch(messages) + '\n').encode('utf-8')
@ -452,7 +526,7 @@ def merge_batch_files(path, batch_time):
os.remove(batch_file) os.remove(batch_file)
def main(nick, oauth_token_path, *channels, base_dir='/mnt', name=None, merge_interval=60, metrics_port=8008): def main(nick, oauth_token_path, *channels, base_dir='/mnt', name=None, merge_interval=60, metrics_port=8008, download_media=False):
with open(oauth_token_path) as f: with open(oauth_token_path) as f:
oauth_token = f.read() oauth_token = f.read()
# To ensure uniqueness even if multiple instances are running on the same host, # To ensure uniqueness even if multiple instances are running on the same host,
@ -478,7 +552,7 @@ def main(nick, oauth_token_path, *channels, base_dir='/mnt', name=None, merge_in
logging.info("Starting") logging.info("Starting")
for index in count(): for index in count():
# To ensure uniqueness between clients, include a client number # To ensure uniqueness between clients, include a client number
archiver = Archiver("{}.{}".format(name, index), base_dir, channels, nick, oauth_token) archiver = Archiver("{}.{}".format(name, index), base_dir, channels, nick, oauth_token, download_media)
archive_worker = gevent.spawn(archiver.run) archive_worker = gevent.spawn(archiver.run)
workers = mergers + [archive_worker] workers = mergers + [archive_worker]
# wait for either graceful exit, error, or for a signal from the archiver that a reconnect was requested # wait for either graceful exit, error, or for a signal from the archiver that a reconnect was requested

@ -51,6 +51,9 @@ class BadScheme(Rejected):
class WrongContent(Rejected): class WrongContent(Rejected):
"""Response was not a video or image""" """Response was not a video or image"""
class FailedResponse(Rejected):
"""Got a 4xx response, probably a bad link"""
def check_for_media(output_dir, url): def check_for_media(output_dir, url):
"""Returns True if we have at least one version of content for the given url already.""" """Returns True if we have at least one version of content for the given url already."""
@ -219,7 +222,10 @@ def _request(url, max_size, content_types):
if resp.get_redirect_location(): if resp.get_redirect_location():
return resp return resp
if resp.status != 200: # 4xx errors are non-retryable, anything else is.
if 400 <= resp.status < 500:
raise FailedResponse(f"Url returned {resp.status} response: {url}")
elif not (200 <= resp.status < 300):
raise Exception(f"Url returned {resp.status} response: {url}") raise Exception(f"Url returned {resp.status} response: {url}")
content_type = resp.getheader("content-type") content_type = resp.getheader("content-type")

@ -220,6 +220,8 @@
token_path: "./chat_token.txt", token_path: "./chat_token.txt",
// Whether to enable backfilling of chat archives to this node (if backfiller enabled) // Whether to enable backfilling of chat archives to this node (if backfiller enabled)
backfill: true, backfill: true,
// Whether to enable downloading of media (images, videos, etc) that is posted in chat.
download_media: true,
// Channels to watch. Defaults to "all twitch channels in $.channels" but you can add extras. // Channels to watch. Defaults to "all twitch channels in $.channels" but you can add extras.
channels: [ channels: [
std.split(c, '!')[0] std.split(c, '!')[0]
@ -614,7 +616,11 @@
[if $.enabled.chat_archiver then "chat_archiver"]: { [if $.enabled.chat_archiver then "chat_archiver"]: {
image: $.get_image("chat_archiver"), image: $.get_image("chat_archiver"),
restart: "always", restart: "always",
command: [$.chat_archiver.user, "/token"] + $.chat_archiver.channels + ["--name", $.localhost], command:
[$.chat_archiver.user, "/token"]
+ $.chat_archiver.channels
+ ["--name", $.localhost]
+ (if $.chat_archiver.download_media then ["--download-media"] else []),
volumes: ["%s:/mnt" % $.segments_path, "%s:/token" % $.chat_archiver.token_path], volumes: ["%s:/mnt" % $.segments_path, "%s:/token" % $.chat_archiver.token_path],
[if "chat_archiver" in $.ports then "ports"]: ["%s:8008" % $.ports.chat_archiver], [if "chat_archiver" in $.ports then "ports"]: ["%s:8008" % $.ports.chat_archiver],
environment: $.env, environment: $.env,

Loading…
Cancel
Save