@ -18,12 +18,13 @@ from uuid import uuid4
import gevent.event
import gevent.queue
from common import ensure_directory, listdir
from common import ensure_directory, listdir, rename
from common.chat import BATCH_INTERVAL, format_batch, get_batch_files, merge_messages
from girc import Client
from monotonic import monotonic
import prometheus_client as prom
import requests
# These are known to arrive up to MAX_DELAY after their actual time
@ -117,6 +118,7 @@ class Archiver(object):
self.name = name
self.messages = gevent.queue.Queue()
self.channel = channel
self.base_dir = base_dir
self.path = os.path.join(base_dir, channel, "chat")
self.stopping = gevent.event.Event()
@ -212,6 +214,11 @@ class Archiver(object):
self.logger.debug("Got message: {}".format(data))
messages_received.labels(channel=self.channel, client=id(self), command=message.command).inc()
if data['tags'] and 'emotes' in data['tags']:
emote_specs = data['tags']['emotes'].split('/')
emote_ids = [emote_spec.split(':')[0] for emote_spec in emote_specs]
ensure_emotes(self.base_dir, emote_ids)
if data['tags'] and 'tmi-sent-ts' in data['tags']:
# explicit server time is available
timestamp = int(data['tags']['tmi-sent-ts']) / 1000. # original is int ms
@ -268,6 +275,53 @@ class Archiver(object):
def atomic_write(filepath, content):
temp_path = "{}.{}.temp".format(filepath, uuid4())
with open(temp_path, 'wb') as f:
rename(temp_path, filepath)
_EMOTES_RUNNING = {} # map (base_dir, emote id, theme, scale) -> in-progress greenlet fetching that path
def ensure_emotes(base_dir, emote_ids):
"""Tries to download given emote from twitch if it doesn't already exist.
This happens in the background and errors are ignored.
def _ensure_emote(emote_id, theme, scale):
url = "https://static-cdn.jtvnw.net/emoticons/v2/{}/default/{}/{}".format(emote_id, theme, scale)
path = os.path.join(base_dir, "emotes", emote_id, "{}-{}".format(theme, scale))
if os.path.exists(path):
logging.debug("Emote {} already exists".format(path))
logging.info("Fetching emote from {}".format(url))
response = requests.get(url)
except Exception:
logging.warning("Exception while fetching emote from {}".format(url), exc_info=True)
if not response.ok:
logging.warning("Error {} while fetching emote from {}: {}".format(response.status_code, url, response.text))
atomic_write(path, response.content)
logging.info("Saved emote {}".format(path))
assert _EMOTES_RUNNING[base_dir, emote_id, theme, scale] is gevent.getcurrent()
del _EMOTES_RUNNING[base_dir, emote_id, theme, scale]
for emote_id in emote_ids:
for theme in ('light', 'dark'):
for scale in ('1.0', '2.0', '3.0'):
# to prevent downloading the same emote twice because the first download isn't finished yet,
# check if it's already running.
key = base_dir, emote_id, theme, scale
if key not in _EMOTES_RUNNING:
_EMOTES_RUNNING[key] = gevent.spawn(_ensure_emote, emote_id, theme, scale)
logging.debug("Skipping checking emote with key {} - already running".format(key))
def write_batch(path, batch_time, messages, size_histogram=None):
"""Batches are named PATH/YYYY-MM-DDTHH/MM:SS-HASH.json"""
output = (format_batch(messages) + '\n').encode('utf-8')
@ -281,11 +335,7 @@ def write_batch(path, batch_time, messages, size_histogram=None):
if os.path.exists(filepath):
logging.debug("Not writing batch {} - already exists.".format(filename))
temppath = "{}.{}.temp".format(filepath, uuid4())
with open(temppath, 'wb') as f:
os.rename(temppath, filepath)
atomic_write(filepath, output)
logging.info("Wrote batch {}".format(filepath))
return filepath