From b80b693f102827f0617bf6034bbfd18e121953bb Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Sun, 1 Sep 2024 13:09:38 +1000 Subject: [PATCH] chat-archiver: extract the ensure_emotes greenlet management to a class --- chat_archiver/chat_archiver/ensure_emotes.py | 1 - chat_archiver/chat_archiver/keyed_group.py | 27 +++++++++++ chat_archiver/chat_archiver/main.py | 47 +++++++++----------- 3 files changed, 48 insertions(+), 27 deletions(-) create mode 100644 chat_archiver/chat_archiver/keyed_group.py diff --git a/chat_archiver/chat_archiver/ensure_emotes.py b/chat_archiver/chat_archiver/ensure_emotes.py index 4e968eb..f27e9a0 100644 --- a/chat_archiver/chat_archiver/ensure_emotes.py +++ b/chat_archiver/chat_archiver/ensure_emotes.py @@ -4,7 +4,6 @@ gevent.monkey.patch_all() import argh import logging -import json from chat_archiver.main import ensure_emotes, wait_for_ensure_emotes diff --git a/chat_archiver/chat_archiver/keyed_group.py b/chat_archiver/chat_archiver/keyed_group.py new file mode 100644 index 0000000..65a1388 --- /dev/null +++ b/chat_archiver/chat_archiver/keyed_group.py @@ -0,0 +1,27 @@ + +"""A group of greenlets running tasks. +Each task has a key. If a task with that key is already running, +it is not re-run.""" + +import gevent + + +class KeyedGroup: + def __init__(self): + self.greenlets = {} + + def spawn(self, key, func, *args, **kwargs): + if key not in self.greenlets: + self.greenlets[key] = gevent.spawn(self._wrapper, key, func, *args, **kwargs) + return self.greenlets[key] + + def wait(self): + """Blocks until all tasks started before wait() was called are finished.""" + gevent.wait(self.greenlets.values()) + + def _wrapper(self, key, func, *args, **kwargs): + try: + return func(*args, **kwargs) + finally: + assert self.greenlets[key] is gevent.getcurrent() + del self.greenlets[key] diff --git a/chat_archiver/chat_archiver/main.py b/chat_archiver/chat_archiver/main.py index 5b2e35a..ff922b7 100644 --- a/chat_archiver/chat_archiver/main.py +++ b/chat_archiver/chat_archiver/main.py @@ -25,6 +25,8 @@ from monotonic import monotonic import prometheus_client as prom import requests +from .keyed_group import KeyedGroup + # These are known to arrive up to MAX_DELAY after their actual time DELAYED_COMMANDS = [ @@ -306,47 +308,40 @@ class Archiver(object): self.client.stop() -_EMOTES_RUNNING = {} # map (base_dir, emote id, theme, scale) -> in-progress greenlet fetching that path +_EMOTES_RUNNING = KeyedGroup() def ensure_emotes(base_dir, emote_ids): """Tries to download given emote from twitch if it doesn't already exist. This happens in the background and errors are ignored. """ def _ensure_emote(emote_id, theme, scale): + url = "https://static-cdn.jtvnw.net/emoticons/v2/{}/default/{}/{}".format(emote_id, theme, scale) + path = os.path.join(base_dir, "emotes", emote_id, "{}-{}".format(theme, scale)) + if os.path.exists(path): + logging.debug("Emote {} already exists".format(path)) + return + logging.info("Fetching emote from {}".format(url)) try: - url = "https://static-cdn.jtvnw.net/emoticons/v2/{}/default/{}/{}".format(emote_id, theme, scale) - path = os.path.join(base_dir, "emotes", emote_id, "{}-{}".format(theme, scale)) - if os.path.exists(path): - logging.debug("Emote {} already exists".format(path)) - return - logging.info("Fetching emote from {}".format(url)) - try: - response = requests.get(url) - except Exception: - logging.warning("Exception while fetching emote from {}".format(url), exc_info=True) - return - if not response.ok: - logging.warning("Error {} while fetching emote from {}: {}".format(response.status_code, url, response.text)) - return - atomic_write(path, response.content) - logging.info("Saved emote {}".format(path)) - finally: - assert _EMOTES_RUNNING[base_dir, emote_id, theme, scale] is gevent.getcurrent() - del _EMOTES_RUNNING[base_dir, emote_id, theme, scale] + response = requests.get(url) + except Exception: + logging.warning("Exception while fetching emote from {}".format(url), exc_info=True) + return + if not response.ok: + logging.warning("Error {} while fetching emote from {}: {}".format(response.status_code, url, response.text)) + return + atomic_write(path, response.content) + logging.info("Saved emote {}".format(path)) for emote_id in emote_ids: for theme in ('light', 'dark'): for scale in ('1.0', '2.0', '3.0'): # to prevent downloading the same emote twice because the first download isn't finished yet, - # check if it's already running. + # use a KeyedGroup. key = base_dir, emote_id, theme, scale - if key not in _EMOTES_RUNNING: - _EMOTES_RUNNING[key] = gevent.spawn(_ensure_emote, emote_id, theme, scale) - else: - logging.debug("Skipping checking emote with key {} - already running".format(key)) + _EMOTES_RUNNING.spawn(key, _ensure_emote, emote_id, theme, scale) def wait_for_ensure_emotes(): - gevent.wait(_EMOTES_RUNNING.values()) + _EMOTES_RUNNING.wait() def write_batch(path, batch_time, messages, size_histogram=None):