From e74d655ce525ede60b54f78989b49caa9b4eb3ad Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 7 Nov 2022 22:48:00 +1100 Subject: [PATCH] chat_archiver: Download each seen emote so we have a permanent record, in case they're deleted or changed later --- chat_archiver/chat_archiver/main.py | 62 ++++++++++++++++++++++++++--- chat_archiver/setup.py | 1 + 2 files changed, 57 insertions(+), 6 deletions(-) diff --git a/chat_archiver/chat_archiver/main.py b/chat_archiver/chat_archiver/main.py index ef15b88..6f4dfab 100644 --- a/chat_archiver/chat_archiver/main.py +++ b/chat_archiver/chat_archiver/main.py @@ -18,12 +18,13 @@ from uuid import uuid4 import gevent.event import gevent.queue -from common import ensure_directory, listdir +from common import ensure_directory, listdir, rename from common.chat import BATCH_INTERVAL, format_batch, get_batch_files, merge_messages from girc import Client from monotonic import monotonic import prometheus_client as prom +import requests # These are known to arrive up to MAX_DELAY after their actual time @@ -117,6 +118,7 @@ class Archiver(object): self.name = name self.messages = gevent.queue.Queue() self.channel = channel + self.base_dir = base_dir self.path = os.path.join(base_dir, channel, "chat") self.stopping = gevent.event.Event() @@ -212,6 +214,11 @@ class Archiver(object): self.logger.debug("Got message: {}".format(data)) messages_received.labels(channel=self.channel, client=id(self), command=message.command).inc() + if data['tags'] and 'emotes' in data['tags']: + emote_specs = data['tags']['emotes'].split('/') + emote_ids = [emote_spec.split(':')[0] for emote_spec in emote_specs] + ensure_emotes(self.base_dir, emote_ids) + if data['tags'] and 'tmi-sent-ts' in data['tags']: # explicit server time is available timestamp = int(data['tags']['tmi-sent-ts']) / 1000. # original is int ms @@ -268,6 +275,53 @@ class Archiver(object): self.client.stop() +def atomic_write(filepath, content): + temp_path = "{}.{}.temp".format(filepath, uuid4()) + ensure_directory(filepath) + with open(temp_path, 'wb') as f: + f.write(content) + rename(temp_path, filepath) + + +_EMOTES_RUNNING = {} # map (base_dir, emote id, theme, scale) -> in-progress greenlet fetching that path +def ensure_emotes(base_dir, emote_ids): + """Tries to download given emote from twitch if it doesn't already exist. + This happens in the background and errors are ignored. + """ + def _ensure_emote(emote_id, theme, scale): + try: + url = "https://static-cdn.jtvnw.net/emoticons/v2/{}/default/{}/{}".format(emote_id, theme, scale) + path = os.path.join(base_dir, "emotes", emote_id, "{}-{}".format(theme, scale)) + if os.path.exists(path): + logging.debug("Emote {} already exists".format(path)) + return + logging.info("Fetching emote from {}".format(url)) + try: + response = requests.get(url) + except Exception: + logging.warning("Exception while fetching emote from {}".format(url), exc_info=True) + return + if not response.ok: + logging.warning("Error {} while fetching emote from {}: {}".format(response.status_code, url, response.text)) + return + atomic_write(path, response.content) + logging.info("Saved emote {}".format(path)) + finally: + assert _EMOTES_RUNNING[base_dir, emote_id, theme, scale] is gevent.getcurrent() + del _EMOTES_RUNNING[base_dir, emote_id, theme, scale] + + for emote_id in emote_ids: + for theme in ('light', 'dark'): + for scale in ('1.0', '2.0', '3.0'): + # to prevent downloading the same emote twice because the first download isn't finished yet, + # check if it's already running. + key = base_dir, emote_id, theme, scale + if key not in _EMOTES_RUNNING: + _EMOTES_RUNNING[key] = gevent.spawn(_ensure_emote, emote_id, theme, scale) + else: + logging.debug("Skipping checking emote with key {} - already running".format(key)) + + def write_batch(path, batch_time, messages, size_histogram=None): """Batches are named PATH/YYYY-MM-DDTHH/MM:SS-HASH.json""" output = (format_batch(messages) + '\n').encode('utf-8') @@ -281,11 +335,7 @@ def write_batch(path, batch_time, messages, size_histogram=None): if os.path.exists(filepath): logging.debug("Not writing batch {} - already exists.".format(filename)) else: - temppath = "{}.{}.temp".format(filepath, uuid4()) - ensure_directory(filepath) - with open(temppath, 'wb') as f: - f.write(output) - os.rename(temppath, filepath) + atomic_write(filepath, output) logging.info("Wrote batch {}".format(filepath)) return filepath diff --git a/chat_archiver/setup.py b/chat_archiver/setup.py index 9c1a15f..9ee564b 100644 --- a/chat_archiver/setup.py +++ b/chat_archiver/setup.py @@ -11,5 +11,6 @@ setup( 'argh', 'gevent', 'monotonic', + 'requests', # for emote fetching ], )