Compare commits

...

9 Commits

Author SHA1 Message Date
Mike Lang 7483794a23 docker-compose: Set up sheetsync for downloading media links
This involves giving it access to the SEGMENTS/media directory.
2 months ago
Mike Lang 3e7cb38cf0 sheetsync: Optionally download media linked in image links column
To enable this, you need to:
- set --media-dir globally for sheetsync
- enable download_media=true for the events sync config
To disable for individual rows (eg. because of known issues), put "[nodownload]" in the notes column.
2 months ago
Mike Lang 7b590cf574 chat-archiver: Some cleanups to the URL matching regex
With thanks to Me-Me for review
2 months ago
Mike Lang 15f86551d4 docker-compose: Backfill of media, with or without chat archiver 2 months ago
Mike Lang 9dfb00f4ab chat_archiver: Logic for checking and downloading media links 2 months ago
Mike Lang 2855ec759d download_media: Add pdf to default allowed content types
We want to capture linked PDFs in addition to videos and images
2 months ago
Mike Lang b46c577014 download_media: Add function for checking if a URL has been downloaded before 2 months ago
Mike Lang 352c9e9081 download_media: Get data from potentially malicious URLs and store in the filesystem
This is suitable for taking arbitary URLs from chat, etc and trying to fetch them.
It downloads them to a filepath that contains a hash of the URL and content.
2 months ago
Mike Lang 07055e3605 chat-archiver: extract the ensure_emotes greenlet management to a class 2 months ago

@ -4,7 +4,6 @@ gevent.monkey.patch_all()
import argh import argh
import logging import logging
import json
from chat_archiver.main import ensure_emotes, wait_for_ensure_emotes from chat_archiver.main import ensure_emotes, wait_for_ensure_emotes

@ -0,0 +1,27 @@
"""A group of greenlets running tasks.
Each task has a key. If a task with that key is already running,
it is not re-run."""
import gevent
class KeyedGroup:
def __init__(self):
self.greenlets = {}
def spawn(self, key, func, *args, **kwargs):
if key not in self.greenlets:
self.greenlets[key] = gevent.spawn(self._wrapper, key, func, *args, **kwargs)
return self.greenlets[key]
def wait(self):
"""Blocks until all tasks started before wait() was called are finished."""
gevent.wait(self.greenlets.values())
def _wrapper(self, key, func, *args, **kwargs):
try:
return func(*args, **kwargs)
finally:
assert self.greenlets[key] is gevent.getcurrent()
del self.greenlets[key]

@ -5,6 +5,7 @@ import json
import logging import logging
import os import os
import random import random
import re
import string import string
import signal import signal
import socket import socket
@ -19,12 +20,15 @@ import gevent.queue
from common import atomic_write, listdir from common import atomic_write, listdir
from common.chat import BATCH_INTERVAL, format_batch, get_batch_files, merge_messages from common.chat import BATCH_INTERVAL, format_batch, get_batch_files, merge_messages
from common.media import download_media, FailedResponse, WrongContent, Rejected
from girc import Client from girc import Client
from monotonic import monotonic from monotonic import monotonic
import prometheus_client as prom import prometheus_client as prom
import requests import requests
from .keyed_group import KeyedGroup
# These are known to arrive up to MAX_DELAY after their actual time # These are known to arrive up to MAX_DELAY after their actual time
DELAYED_COMMANDS = [ DELAYED_COMMANDS = [
@ -113,12 +117,13 @@ merge_pass_merges = prom.Histogram(
) )
class Archiver(object): class Archiver(object):
def __init__(self, name, base_dir, channels, nick, oauth_token): def __init__(self, name, base_dir, channels, nick, oauth_token, download_media):
self.logger = logging.getLogger(type(self).__name__).getChild(name) self.logger = logging.getLogger(type(self).__name__).getChild(name)
self.name = name self.name = name
self.messages = gevent.queue.Queue() self.messages = gevent.queue.Queue()
self.channels = channels self.channels = channels
self.base_dir = base_dir self.base_dir = base_dir
self.download_media = download_media
self.stopping = gevent.event.Event() self.stopping = gevent.event.Event()
self.got_reconnect = gevent.event.Event() self.got_reconnect = gevent.event.Event()
@ -250,6 +255,9 @@ class Archiver(object):
emote_ids = [emote_spec.split(':')[0] for emote_spec in emote_specs] emote_ids = [emote_spec.split(':')[0] for emote_spec in emote_specs]
ensure_emotes(self.base_dir, emote_ids) ensure_emotes(self.base_dir, emote_ids)
if self.download_media and data['command'] == "PRIVMSG" and len(data["params"]) == 2:
ensure_image_links(self.base_dir, data["params"][1])
if data['tags'] and 'tmi-sent-ts' in data['tags']: if data['tags'] and 'tmi-sent-ts' in data['tags']:
# explicit server time is available # explicit server time is available
timestamp = int(data['tags']['tmi-sent-ts']) / 1000. # original is int ms timestamp = int(data['tags']['tmi-sent-ts']) / 1000. # original is int ms
@ -306,47 +314,108 @@ class Archiver(object):
self.client.stop() self.client.stop()
_EMOTES_RUNNING = {} # map (base_dir, emote id, theme, scale) -> in-progress greenlet fetching that path _EMOTES_RUNNING = KeyedGroup()
def ensure_emotes(base_dir, emote_ids): def ensure_emotes(base_dir, emote_ids):
"""Tries to download given emote from twitch if it doesn't already exist. """Tries to download given emote from twitch if it doesn't already exist.
This happens in the background and errors are ignored. This happens in the background and errors are ignored.
""" """
def _ensure_emote(emote_id, theme, scale): def _ensure_emote(emote_id, theme, scale):
url = "https://static-cdn.jtvnw.net/emoticons/v2/{}/default/{}/{}".format(emote_id, theme, scale)
path = os.path.join(base_dir, "emotes", emote_id, "{}-{}".format(theme, scale))
if os.path.exists(path):
logging.debug("Emote {} already exists".format(path))
return
logging.info("Fetching emote from {}".format(url))
try: try:
url = "https://static-cdn.jtvnw.net/emoticons/v2/{}/default/{}/{}".format(emote_id, theme, scale) response = requests.get(url)
path = os.path.join(base_dir, "emotes", emote_id, "{}-{}".format(theme, scale)) except Exception:
if os.path.exists(path): logging.warning("Exception while fetching emote from {}".format(url), exc_info=True)
logging.debug("Emote {} already exists".format(path)) return
return if not response.ok:
logging.info("Fetching emote from {}".format(url)) logging.warning("Error {} while fetching emote from {}: {}".format(response.status_code, url, response.text))
try: return
response = requests.get(url) atomic_write(path, response.content)
except Exception: logging.info("Saved emote {}".format(path))
logging.warning("Exception while fetching emote from {}".format(url), exc_info=True)
return
if not response.ok:
logging.warning("Error {} while fetching emote from {}: {}".format(response.status_code, url, response.text))
return
atomic_write(path, response.content)
logging.info("Saved emote {}".format(path))
finally:
assert _EMOTES_RUNNING[base_dir, emote_id, theme, scale] is gevent.getcurrent()
del _EMOTES_RUNNING[base_dir, emote_id, theme, scale]
for emote_id in emote_ids: for emote_id in emote_ids:
for theme in ('light', 'dark'): for theme in ('light', 'dark'):
for scale in ('1.0', '2.0', '3.0'): for scale in ('1.0', '2.0', '3.0'):
# to prevent downloading the same emote twice because the first download isn't finished yet, # to prevent downloading the same emote twice because the first download isn't finished yet,
# check if it's already running. # use a KeyedGroup.
key = base_dir, emote_id, theme, scale key = base_dir, emote_id, theme, scale
if key not in _EMOTES_RUNNING: _EMOTES_RUNNING.spawn(key, _ensure_emote, emote_id, theme, scale)
_EMOTES_RUNNING[key] = gevent.spawn(_ensure_emote, emote_id, theme, scale)
else:
logging.debug("Skipping checking emote with key {} - already running".format(key))
def wait_for_ensure_emotes(): def wait_for_ensure_emotes():
gevent.wait(_EMOTES_RUNNING.values()) _EMOTES_RUNNING.wait()
URL_REGEX = re.compile(r"""
# Previous char is not a letter. This prevents eg. "foohttp://example.com"
# Also disallows / as the previous character, otherwise "file:///foo.bar/baz"
# can match on the "foo.bar/baz" part.
(?<! [\w/] )
# optional scheme, which must be http or https (we don't want other schemes)
(?P<scheme> https?:// )?
# Hostname, which must contain a dot. Single-part hostnames like "localhost" are valid
# but we don't want to match them, and this avoids cases like "yes/no" matching.
# We enforce that the TLD is not fully numeric. No TLDs currently look like this
# (though it does end up forbidding raw IPv4 addresses), and a common false-positive
# is "1.5/10" or similar.
( [a-z0-9-]+ \. )+ [a-z][a-z0-9-]+
# Optional port
( : [0-9]+ )?
# Optional path. We assume a path character can be anything that's not completely disallowed
# but don't try to parse it further into query, fragment etc.
# We also include all unicode characters considered "letters" since it's likely someone might
# put a ö or something in a path and copy-paste it from their browser URL bar which renders it
# like that even though it's encoded when actually sent as a URL.
# Restricting this to letters prevents things like non-breaking spaces causing problems.
# For the same reason we also allow {} and [] which seem to show up often in paths.
(?P<path> / [\w!#$%&'()*+,./:;=?@_~{}\[\]-]* )?
""", re.VERBOSE | re.IGNORECASE)
_IMAGE_LINKS_RUNNING = KeyedGroup()
def ensure_image_links(base_dir, text):
"""Find any image or video links in the text and download them if we don't have them already.
This happens in the background and errors are ignored."""
media_dir = os.path.join(base_dir, "media")
def get_url(url):
try:
try:
download_media(url, media_dir)
except FailedResponse:
# We got a 404 or similar.
# Attempt to remove any stray punctuation from the url and try again.
# We only try this once.
if url.endswith("..."):
url = url[:-3]
elif not url[-1].isalnum():
url = url[:-1]
else:
# No puncuation found, let the original result stand
raise
download_media(url, media_dir)
except WrongContent as e:
logging.info(f"Ignoring non-media link {url}: {e}")
except Rejected as e:
logging.warning(f"Rejected dangerous link {url}: {e}")
except Exception:
logging.warning(f"Unable to fetch link {url}", exc_info=True)
for match in URL_REGEX.finditer(text):
# Don't match on bare hostnames with no scheme AND no path. ie.
# http://example.com SHOULD match
# example.com/foo SHOULD match
# example.com SHOULD NOT match
# Otherwise we get a false positive every time someone says "fart.wav" or similar.
if match.group("scheme") is None and match.group("path") is None:
continue
url = match.group(0)
key = (media_dir, url)
_IMAGE_LINKS_RUNNING.spawn(key, get_url, url)
def write_batch(path, batch_time, messages, size_histogram=None): def write_batch(path, batch_time, messages, size_histogram=None):
@ -457,7 +526,7 @@ def merge_batch_files(path, batch_time):
os.remove(batch_file) os.remove(batch_file)
def main(nick, oauth_token_path, *channels, base_dir='/mnt', name=None, merge_interval=60, metrics_port=8008): def main(nick, oauth_token_path, *channels, base_dir='/mnt', name=None, merge_interval=60, metrics_port=8008, download_media=False):
with open(oauth_token_path) as f: with open(oauth_token_path) as f:
oauth_token = f.read() oauth_token = f.read()
# To ensure uniqueness even if multiple instances are running on the same host, # To ensure uniqueness even if multiple instances are running on the same host,
@ -483,7 +552,7 @@ def main(nick, oauth_token_path, *channels, base_dir='/mnt', name=None, merge_in
logging.info("Starting") logging.info("Starting")
for index in count(): for index in count():
# To ensure uniqueness between clients, include a client number # To ensure uniqueness between clients, include a client number
archiver = Archiver("{}.{}".format(name, index), base_dir, channels, nick, oauth_token) archiver = Archiver("{}.{}".format(name, index), base_dir, channels, nick, oauth_token, download_media)
archive_worker = gevent.spawn(archiver.run) archive_worker = gevent.spawn(archiver.run)
workers = mergers + [archive_worker] workers = mergers + [archive_worker]
# wait for either graceful exit, error, or for a signal from the archiver that a reconnect was requested # wait for either graceful exit, error, or for a signal from the archiver that a reconnect was requested

@ -0,0 +1,247 @@
import json
import logging
import os
import re
import socket
import time
import urllib.parse
from base64 import b64encode
from hashlib import sha256
from uuid import uuid4
import gevent
import prometheus_client as prom
import urllib3.connection
from ipaddress import ip_address
from . import atomic_write, ensure_directory, jitter, listdir
from .stats import timed
media_bytes_downloaded = prom.Counter(
"media_bytes_downloaded",
"Number of bytes of media files downloaded. Includes data downloaded then later rejected.",
)
media_bytes_saved = prom.Histogram(
"media_bytes_saved",
"Size in bytes of downloaded media that was successfully saved",
["content_type"],
buckets = [2**n for n in range(11, 27, 2)],
)
media_already_exists = prom.Counter(
"media_already_exists",
"Count of times we downloaded a file but it already existed",
)
class Rejected(Exception):
"""Indicates a non-retryable failure due to the url response violating our constraints"""
class TooLarge(Rejected):
"""Response was too large"""
class ForbiddenDestination(Rejected):
"""Hostname resolved to non-global IP"""
class BadScheme(Rejected):
"""Bad url scheme"""
class WrongContent(Rejected):
"""Response was not a video or image"""
class FailedResponse(Rejected):
"""Got a 4xx response, probably a bad link"""
def check_for_media(output_dir, url):
"""Returns True if we have at least one version of content for the given url already."""
url_dir = get_url_dir(output_dir, url)
return any(filename.endswith(".metadata.json") for filename in listdir(url_dir))
@timed()
def download_media(
url,
output_dir,
max_size=128*2**20, # 128MiB
timeout=60,
content_types=("image", "video", "application/pdf"),
max_redirects=5,
retries=3,
retry_interval=1,
chunk_size=64*1024, # 64KiB
):
"""Make a GET request to a potentially malicious URL and download the content to file.
We check the following:
- That the host is a public IP
- That the response does not exceed given max size (default 128MB)
- That the content type is in the given list
(the list may contain exact types like "image/png" or categories like "image")
- That the whole thing doesn't take more than a timeout
Redirects *will* be followed but the follow-up requests must obey the same rules
(and do not reset the timeout).
We save the file to OUTPUT_DIR/URL_HASH/FILE_HASH.EXT where EXT is gussed from content-type.
We save additional metadata including the url and content type to OUTPUT_DIR/URL_HASH/FILE_HASH.metadata.json
Raises on any rule violation or non-200 response.
"""
# Stores a list of urls redirected to, latest is current.
urls = [url]
with gevent.Timeout(timeout):
for redirect_number in range(max_redirects):
errors = []
for retry in range(retries):
if retry > 0:
gevent.sleep(jitter(retry_interval))
try:
resp = _request(urls[-1], max_size, content_types)
new_url = resp.get_redirect_location()
if new_url:
urls.append(new_url)
break # break from retry loop, continuing in the redirect loop
_save_response(output_dir, urls, resp, max_size, chunk_size)
return
except Rejected:
raise
except Exception as e:
errors.append(e)
# fall through to next retry loop
else:
# This block will be reached if range(retries) runs out but not via "break"
raise ExceptionGroup(f"All retries failed for url {urls[-1]}", errors)
raise Exception("Too many redirects")
def hash_to_path(hash):
return b64encode(hash.digest(), b"-_").decode().rstrip("=")
def get_url_dir(output_dir, url):
return os.path.join(output_dir, hash_to_path(sha256(url.encode())))
def _save_response(output_dir, urls, resp, max_size, chunk_size):
url_dir = get_url_dir(output_dir, urls[0])
temp_path = os.path.join(url_dir, f".{uuid4()}.temp")
ensure_directory(temp_path)
content_type = resp.headers["content-type"]
# Content type may have form "TYPE ; PARAMS", strip params if present.
# Also normalize for whitespace and case.
content_type = content_type.split(";")[0].strip().lower()
# We attempt to convert content type to an extension by taking the second part
# and stripping anything past the first character not in [a-z0-9-].
# So eg. "image/png" -> "png", "image/svg+xml" -> "svg", "image/../../../etc/password" -> ""
ext = content_type.split("/")[-1]
ext = re.match(r"^[a-z0-9.-]*", ext).group(0)
try:
length = 0
hash = sha256()
with open(temp_path, "wb") as f:
while True:
chunk = resp.read(chunk_size)
if not chunk:
break
hash.update(chunk)
length += len(chunk)
media_bytes_downloaded.inc(len(chunk))
if length > max_size:
raise TooLarge(f"Read more than {length} bytes from url {urls[-1]}")
f.write(chunk)
filename = f"{hash_to_path(hash)}.{ext}"
filepath = os.path.join(url_dir, filename)
# This is vulnerable to a race where two things create the file at once,
# but that's fine since it will always have the same content. This is just an optimization
# to avoid replacing the file over and over (and for observability)
if os.path.exists(filepath):
logging.info(f"Discarding downloaded file for {urls[0]} as it already exists")
media_already_exists.inc()
else:
os.rename(temp_path, filepath)
logging.info(f"Downloaded file for {urls[0]}")
media_bytes_saved.labels(content_type).observe(length)
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
metadata_path = os.path.join(url_dir, f"{hash_to_path(hash)}.metadata.json")
# Again, this is racy but we don't care about double-writes.
# Note it's entirely possible for the image to already exist but still write the metadata,
# this can happen if a previous attempt crashed midway.
if not os.path.exists(metadata_path):
metadata = {
"url": urls[0],
"filename": filename,
"redirects": urls[1:],
"content_type": resp.headers["content-type"],
"fetched_by": socket.gethostname(),
"fetch_time": time.time(),
}
atomic_write(metadata_path, json.dumps(metadata, indent=4))
def _request(url, max_size, content_types):
"""Do the actual request and return a vetted response object, which is either the content
(status 200) or a redirect.
Raises Rejected if content fails checks, anything else should be considered retryable."""
parsed = urllib.parse.urlparse(url)
hostname = parsed.hostname
port = parsed.port
ip = socket.gethostbyname(hostname)
if not ip_address(ip).is_global:
raise ForbiddenDestination(f"Non-global IP {ip} for url {url}")
# In order to provide the host/ip to connect to seperately from the URL,
# we need to drop to a fairly low-level interface.
if parsed.scheme == "http":
conn = urllib3.connection.HTTPConnection(ip, port or 80)
elif parsed.scheme == "https":
conn = urllib3.connection.HTTPSConnection(
ip, port or 443,
assert_hostname = hostname,
server_hostname = hostname,
)
else:
raise BadScheme(f"Bad scheme {parsed.scheme!r} for url {url}")
conn.request("GET", url, preload_content=False)
resp = conn.getresponse()
# Redirects do not require further checks
if resp.get_redirect_location():
return resp
# 4xx errors are non-retryable, anything else is.
if 400 <= resp.status < 500:
raise FailedResponse(f"Url returned {resp.status} response: {url}")
elif not (200 <= resp.status < 300):
raise Exception(f"Url returned {resp.status} response: {url}")
content_type = resp.getheader("content-type")
if content_type is None:
raise Exception(f"No content-type given for url {url}")
if not any(content_type.startswith(target) for target in content_types):
raise WrongContent(f"Disallowed content-type {content_type} for url {url}")
# If length is known but too large, reject early
length = resp.getheader("content-length")
if length is not None:
try:
length = int(length)
except ValueError:
raise Exception(f"Invalid content length {length!r} for url {url}")
if length > max_size:
raise TooLarge(f"Content length {length} is too large for url {url}")
return resp

@ -180,6 +180,15 @@
// Extra directories (besides segments) to backfill // Extra directories (besides segments) to backfill
backfill_dirs:: ["emotes"], backfill_dirs:: ["emotes"],
// Enable saving of media (images and videos - this can be large), either globally or split into
// three options:
// - From chat messages (in chat_archiver.download_media)
// - From the image links column in the sheet (using sheetsync)
// - Backfilled from other nodes
download_media:: true,
backfill_media:: $.download_media,
download_sheet_links:: $.download_media,
// The spreadsheet id and worksheet names for sheet sync to act on // The spreadsheet id and worksheet names for sheet sync to act on
// Set to null to disable syncing from sheets. // Set to null to disable syncing from sheets.
sheet_id:: "your_id_here", sheet_id:: "your_id_here",
@ -220,6 +229,8 @@
token_path: "./chat_token.txt", token_path: "./chat_token.txt",
// Whether to enable backfilling of chat archives to this node (if backfiller enabled) // Whether to enable backfilling of chat archives to this node (if backfiller enabled)
backfill: true, backfill: true,
// Whether to enable downloading of media (images, videos, etc) that is posted in chat.
download_media: $.download_media,
// Channels to watch. Defaults to "all twitch channels in $.channels" but you can add extras. // Channels to watch. Defaults to "all twitch channels in $.channels" but you can add extras.
channels: [ channels: [
std.split(c, '!')[0] std.split(c, '!')[0]
@ -365,7 +376,10 @@
[ [
"--base-dir", "/mnt", "--base-dir", "/mnt",
"--qualities", std.join(",", $.qualities + (if $.chat_archiver.backfill then ["chat"] else [])), "--qualities", std.join(",", $.qualities + (if $.chat_archiver.backfill then ["chat"] else [])),
"--extras", std.join(",", $.backfill_dirs), "--extras", std.join(",",
$.backfill_dirs
+ (if $.backfill_media then ["media"] else [])
),
"--static-nodes", std.join(",", $.peers), "--static-nodes", std.join(",", $.peers),
"--backdoor-port", std.toString($.backdoor_port), "--backdoor-port", std.toString($.backdoor_port),
"--node-database", $.db_connect, "--node-database", $.db_connect,
@ -456,6 +470,7 @@
worksheets: $.worksheets, worksheets: $.worksheets,
edit_url: $.edit_url, edit_url: $.edit_url,
bustime_start: $.bustime_start, bustime_start: $.bustime_start,
download_media: $.download_sheet_links,
}, },
sync_sheet_base + { sync_sheet_base + {
name+: "sheet-playlists", name+: "sheet-playlists",
@ -481,7 +496,7 @@
event_id: $.streamlog_event, event_id: $.streamlog_event,
}, },
local sync_streamlog = [ local sync_streamlog = [
sync_streamlog_base + {name: "streamlog-events", type: "events"}, sync_streamlog_base + {name: "streamlog-events", type: "events", download_media: $.download_sheet_links},
sync_streamlog_base + {name: "streamlog-playlists", type: "playlists"}, sync_streamlog_base + {name: "streamlog-playlists", type: "playlists"},
], ],
local config = ( local config = (
@ -493,11 +508,15 @@
command: [ command: [
"--backdoor-port", std.toString($.backdoor_port), "--backdoor-port", std.toString($.backdoor_port),
$.db_connect, $.db_connect,
] + std.map(std.manifestJson, config), ]
// Mount the creds file(s) into /etc + (if $.download_sheet_links then ["--media-dir=/mnt"] else [])
+ std.map(std.manifestJson, config),
volumes: std.prune([ volumes: std.prune([
// Mount the creds file(s) into /etc
if $.sheet_id != null then "%s:/etc/sheet-creds.json" % $.sheet_creds_file, if $.sheet_id != null then "%s:/etc/sheet-creds.json" % $.sheet_creds_file,
if $.streamlog_url != null then "%s:/etc/streamlog-token.txt" % $.streamlog_creds_file, if $.streamlog_url != null then "%s:/etc/streamlog-token.txt" % $.streamlog_creds_file,
// Mount the segments media directory
if $.download_sheet_links then "%s/media:/mnt" % $.segments_path,
]), ]),
// If the application crashes, restart it. // If the application crashes, restart it.
restart: "on-failure", restart: "on-failure",
@ -614,7 +633,11 @@
[if $.enabled.chat_archiver then "chat_archiver"]: { [if $.enabled.chat_archiver then "chat_archiver"]: {
image: $.get_image("chat_archiver"), image: $.get_image("chat_archiver"),
restart: "always", restart: "always",
command: [$.chat_archiver.user, "/token"] + $.chat_archiver.channels + ["--name", $.localhost], command:
[$.chat_archiver.user, "/token"]
+ $.chat_archiver.channels
+ ["--name", $.localhost]
+ (if $.chat_archiver.download_media then ["--download-media"] else []),
volumes: ["%s:/mnt" % $.segments_path, "%s:/token" % $.chat_archiver.token_path], volumes: ["%s:/mnt" % $.segments_path, "%s:/token" % $.chat_archiver.token_path],
[if "chat_archiver" in $.ports then "ports"]: ["%s:8008" % $.ports.chat_archiver], [if "chat_archiver" in $.ports then "ports"]: ["%s:8008" % $.ports.chat_archiver],
environment: $.env, environment: $.env,

@ -3,6 +3,7 @@ import json
import logging import logging
import signal import signal
from collections import defaultdict from collections import defaultdict
from urllib.parse import urlparse
import argh import argh
import gevent.backdoor import gevent.backdoor
@ -15,6 +16,7 @@ from requests import HTTPError
import common import common
import common.dateutil import common.dateutil
from common.database import DBManager, query, get_column_placeholder from common.database import DBManager, query, get_column_placeholder
from common.media import check_for_media, download_media
from common.sheets import Sheets as SheetsClient from common.sheets import Sheets as SheetsClient
from .sheets import SheetsEventsMiddleware, SheetsPlaylistsMiddleware, SheetsArchiveMiddleware from .sheets import SheetsEventsMiddleware, SheetsPlaylistsMiddleware, SheetsArchiveMiddleware
@ -274,6 +276,11 @@ class EventsSync(SheetSync):
"category", "category",
} }
def __init__(self, name, middleware, stop, dbmanager, reverse_sync=False, media_dir=None):
super().__init__(name, middleware, stop, dbmanager, reverse_sync)
self.media_dir = media_dir
self.media_downloads = None if media_dir is None else {}
def observe_rows(self, rows): def observe_rows(self, rows):
counts = defaultdict(lambda: 0) counts = defaultdict(lambda: 0)
for row in rows: for row in rows:
@ -287,6 +294,25 @@ class EventsSync(SheetSync):
def sync_row(self, sheet_row, db_row): def sync_row(self, sheet_row, db_row):
# Do some special-case transforms for events before syncing # Do some special-case transforms for events before syncing
# Attempt to download any URLs in the links column if we don't already have them.
# This is done asyncronously. We keep a record of failed attempts for two reasons:
# - To avoid retrying
# - To populate the errors column asyncronously
# This record is just in memory - we're ok retrying after every restart.
# You can disable downloads on a per-row basis by putting "[nodownload]" in the notes column.
if sheet_row is not None and self.media_dir is not None and "[nodownload]" not in sheet_row["notes"]:
for url in sheet_row['image_links']:
if url not in self.media_downloads:
self.media_downloads[url] = gevent.spawn(self.download_media, url)
# Greenlet.exception is populated if the greenlet failed with an exception,
# or None otherwise (success or not finished).
# We treat a failure to fetch a URL like a parse error.
e = self.media_downloads[url].exception
if e is not None:
sheet_row.setdefault("_parse_errors", []).append(
f"Failed to download media link {url:!r}: {e}"
)
if db_row is not None: if db_row is not None:
# If no database error, but we have parse errors, indicate they should be displayed. # If no database error, but we have parse errors, indicate they should be displayed.
if db_row.error is None and sheet_row is not None and sheet_row.get('_parse_errors'): if db_row.error is None and sheet_row is not None and sheet_row.get('_parse_errors'):
@ -300,6 +326,20 @@ class EventsSync(SheetSync):
super().sync_row(sheet_row, db_row) super().sync_row(sheet_row, db_row)
def download_media(self, url):
hostname = urlparse(url).hostname
if hostname in ("youtu.be", "youtube.com"):
self.logger.info(f"Ignoring url {url:!r}: Blocklisted hostname")
if check_for_media(self.media_dir, url):
self.logger.info(f"Already have content for url {url:!r}")
return
try:
download_media(url, self.media_dir)
except Exception:
self.logger.warning(f"Failed to download url {url:!r}", exc_info=True)
raise
self.logger.info(f"Downloaded media for url {url:!r}")
class ArchiveSync(EventsSync): class ArchiveSync(EventsSync):
# Archive events are a special case of event with less input columns. # Archive events are a special case of event with less input columns.
@ -372,7 +412,7 @@ class PlaylistsSync(SheetSync):
event_id: The id of the streamlog event to sync event_id: The id of the streamlog event to sync
""", """,
) )
def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0): def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0, media_dir="."):
""" """
Sheet sync constantly scans a Google Sheets sheet and a database, copying inputs from the sheet Sheet sync constantly scans a Google Sheets sheet and a database, copying inputs from the sheet
to the DB and outputs from the DB to the sheet. to the DB and outputs from the DB to the sheet.
@ -466,7 +506,10 @@ def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0):
"playlists": PlaylistsSync, "playlists": PlaylistsSync,
"archive": ArchiveSync, "archive": ArchiveSync,
}[config["type"]] }[config["type"]]
sync = sync_class(config["name"], middleware, stop, dbmanager, reverse_sync) sync_class_kwargs = {}
if config["type"] == "events" and config.get("download_media", False):
sync_class_kwargs["media_dir"] = media_dir
sync = sync_class(config["name"], middleware, stop, dbmanager, reverse_sync, **sync_class_kwargs)
workers.append(sync) workers.append(sync)
jobs = [gevent.spawn(worker.run) for worker in workers] jobs = [gevent.spawn(worker.run) for worker in workers]

Loading…
Cancel
Save