chat-archiver: extract the ensure_emotes greenlet management to a class

pull/408/head
Mike Lang 3 months ago committed by Mike Lang
parent 3f6263a037
commit 07055e3605

@ -4,7 +4,6 @@ gevent.monkey.patch_all()
import argh import argh
import logging import logging
import json
from chat_archiver.main import ensure_emotes, wait_for_ensure_emotes from chat_archiver.main import ensure_emotes, wait_for_ensure_emotes

@ -0,0 +1,27 @@
"""A group of greenlets running tasks.
Each task has a key. If a task with that key is already running,
it is not re-run."""
import gevent
class KeyedGroup:
def __init__(self):
self.greenlets = {}
def spawn(self, key, func, *args, **kwargs):
if key not in self.greenlets:
self.greenlets[key] = gevent.spawn(self._wrapper, key, func, *args, **kwargs)
return self.greenlets[key]
def wait(self):
"""Blocks until all tasks started before wait() was called are finished."""
gevent.wait(self.greenlets.values())
def _wrapper(self, key, func, *args, **kwargs):
try:
return func(*args, **kwargs)
finally:
assert self.greenlets[key] is gevent.getcurrent()
del self.greenlets[key]

@ -25,6 +25,8 @@ from monotonic import monotonic
import prometheus_client as prom import prometheus_client as prom
import requests import requests
from .keyed_group import KeyedGroup
# These are known to arrive up to MAX_DELAY after their actual time # These are known to arrive up to MAX_DELAY after their actual time
DELAYED_COMMANDS = [ DELAYED_COMMANDS = [
@ -306,13 +308,12 @@ class Archiver(object):
self.client.stop() self.client.stop()
_EMOTES_RUNNING = {} # map (base_dir, emote id, theme, scale) -> in-progress greenlet fetching that path _EMOTES_RUNNING = KeyedGroup()
def ensure_emotes(base_dir, emote_ids): def ensure_emotes(base_dir, emote_ids):
"""Tries to download given emote from twitch if it doesn't already exist. """Tries to download given emote from twitch if it doesn't already exist.
This happens in the background and errors are ignored. This happens in the background and errors are ignored.
""" """
def _ensure_emote(emote_id, theme, scale): def _ensure_emote(emote_id, theme, scale):
try:
url = "https://static-cdn.jtvnw.net/emoticons/v2/{}/default/{}/{}".format(emote_id, theme, scale) url = "https://static-cdn.jtvnw.net/emoticons/v2/{}/default/{}/{}".format(emote_id, theme, scale)
path = os.path.join(base_dir, "emotes", emote_id, "{}-{}".format(theme, scale)) path = os.path.join(base_dir, "emotes", emote_id, "{}-{}".format(theme, scale))
if os.path.exists(path): if os.path.exists(path):
@ -329,24 +330,18 @@ def ensure_emotes(base_dir, emote_ids):
return return
atomic_write(path, response.content) atomic_write(path, response.content)
logging.info("Saved emote {}".format(path)) logging.info("Saved emote {}".format(path))
finally:
assert _EMOTES_RUNNING[base_dir, emote_id, theme, scale] is gevent.getcurrent()
del _EMOTES_RUNNING[base_dir, emote_id, theme, scale]
for emote_id in emote_ids: for emote_id in emote_ids:
for theme in ('light', 'dark'): for theme in ('light', 'dark'):
for scale in ('1.0', '2.0', '3.0'): for scale in ('1.0', '2.0', '3.0'):
# to prevent downloading the same emote twice because the first download isn't finished yet, # to prevent downloading the same emote twice because the first download isn't finished yet,
# check if it's already running. # use a KeyedGroup.
key = base_dir, emote_id, theme, scale key = base_dir, emote_id, theme, scale
if key not in _EMOTES_RUNNING: _EMOTES_RUNNING.spawn(key, _ensure_emote, emote_id, theme, scale)
_EMOTES_RUNNING[key] = gevent.spawn(_ensure_emote, emote_id, theme, scale)
else:
logging.debug("Skipping checking emote with key {} - already running".format(key))
def wait_for_ensure_emotes(): def wait_for_ensure_emotes():
gevent.wait(_EMOTES_RUNNING.values()) _EMOTES_RUNNING.wait()
def write_batch(path, batch_time, messages, size_histogram=None): def write_batch(path, batch_time, messages, size_histogram=None):

Loading…
Cancel
Save