From 0756539b857dfa2a76c260bebcdc7db7ed8ba0ad Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 8 Nov 2021 14:59:52 +1100 Subject: [PATCH] chat-archiver: Early work and basic archival --- .gitmodules | 3 + chat_archiver/Dockerfile | 6 + chat_archiver/README | 53 ++++++ chat_archiver/chat_archiver/__init__.py | 0 chat_archiver/chat_archiver/__main__.py | 15 ++ chat_archiver/chat_archiver/main.py | 209 ++++++++++++++++++++++++ chat_archiver/girc | 1 + chat_archiver/setup.py | 14 ++ docker-compose.jsonnet | 16 ++ 9 files changed, 317 insertions(+) create mode 100644 .gitmodules create mode 100644 chat_archiver/Dockerfile create mode 100644 chat_archiver/README create mode 100644 chat_archiver/chat_archiver/__init__.py create mode 100644 chat_archiver/chat_archiver/__main__.py create mode 100644 chat_archiver/chat_archiver/main.py create mode 160000 chat_archiver/girc create mode 100644 chat_archiver/setup.py diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..4708421 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "chat_archiver/girc"] + path = chat_archiver/girc + url = https://github.com/ekimekim/girc diff --git a/chat_archiver/Dockerfile b/chat_archiver/Dockerfile new file mode 100644 index 0000000..23546a5 --- /dev/null +++ b/chat_archiver/Dockerfile @@ -0,0 +1,6 @@ +FROM quay.io/ekimekim/wubloader-downloader:32138bb +COPY girc /tmp/girc +RUN pip install /tmp/girc && rm -r /tmp/girc +COPY . /tmp/archiver +RUN pip install /tmp/archiver && rm -r /tmp/archiver +ENTRYPOINT ["python", "-m", "chat_archiver", "--base-dir", "/mnt"] diff --git a/chat_archiver/README b/chat_archiver/README new file mode 100644 index 0000000..6101387 --- /dev/null +++ b/chat_archiver/README @@ -0,0 +1,53 @@ +Chat archiver records messages from TMI (the twitch messaging system) +in a way that preserves as much context as possible, and allows multiple independently-recorded +streams to be combined to ensure nothing was missed. + +We store messages in newline-delimited JSON files, in timestamp order. +Each file covers one minute of messages. +These files are named by their timestamp + hash and merged with other files via a CRDT model. + CRDT means you have a merge operation (.) such that + (A.B).C == A.(B.C) (associative) + A.B == B.A (commutitive) + A.A == A (reflexive) + +We have a few different kinds of messages to deal with: + Messages with a unique id and timestamp + eg. PRIVMSG, USERNOTICE. + These are easy, as they have a canonical timestamp and can be trivially deduplicated by id. + Messages with an implied ordering + eg. ROOMSTATE, NOTICE, CLEARCHAT. + These messages arrive in what we assume is a consistent order on all clients, + but have no direct associated timestamp. We thus set a timestamp *range* for when + the message could have occurred from the server's perspective, between the last known + server timestamp (since it must be after that message) and the next received server timestamp + (since it must be before that message). We can set some reasonable timeout here in case + we don't receive another message within a short time window. + Messages with a timestamp range are ordered by their timestamp start. + This also governs which file they are in if their range overlaps two files. + Messages known to be out of order + This is specific to JOINs and PARTs. + Twitch documents that these may be delayed by up to 10sec. + So we follow the rules as per messages with implied ordering, + except we pad the possible start time by 10 seconds. + +How to merge two files + In general, if the same message (all non-receiver fields identical) is present in both files, + it is included once in the output. For messages with unique ids, this is all that's needed. + For messages without unique ids, we face the question of "is this the same message". + All the following must be true: + * All non-timestamp, non-receiver fields match + * Timestamp ranges overlap + If a message may match multiple messages on the other side with these rules, then + we pick one arbitrarily. + We then merge these messages, setting the timestamp range to the intersection of the inputs. + Literal edge cases: Timestamp ranges that span two files + It may be the case that we can match a message whose timestamp range overhangs file A + with a message near the start of file B. So whenever we are merging files, we need to + also consider the file before and the file after on our side. + In all cases when we merge two messages, we should merge the receiver timestamp field which maps + the receiver id to the timestamp it received the message. This preserves message provedence. + +All files are stored in newline-delimited, canonicalised JSON so we can use hashes to compare them. +It should always be true that merging B into A and merging A into B should produce identical files +with the same hash (effects of neighboring files notwithstanding - that just means more merges will +be needed in order to stabilize). diff --git a/chat_archiver/chat_archiver/__init__.py b/chat_archiver/chat_archiver/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chat_archiver/chat_archiver/__main__.py b/chat_archiver/chat_archiver/__main__.py new file mode 100644 index 0000000..f60c3fc --- /dev/null +++ b/chat_archiver/chat_archiver/__main__.py @@ -0,0 +1,15 @@ +import gevent.monkey +gevent.monkey.patch_all() + +import logging +import os + +import argh + +from .main import main + +LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s" + +level = os.environ.get('WUBLOADER_LOG_LEVEL', 'INFO').upper() +logging.basicConfig(level=level, format=LOG_FORMAT) +argh.dispatch_command(main) diff --git a/chat_archiver/chat_archiver/main.py b/chat_archiver/chat_archiver/main.py new file mode 100644 index 0000000..db9c3a5 --- /dev/null +++ b/chat_archiver/chat_archiver/main.py @@ -0,0 +1,209 @@ + +import base64 +import hashlib +import json +import logging +import os +import signal +import socket +import time +from datetime import datetime +from uuid import uuid4 + +import gevent.event +import gevent.queue + +from common import ensure_directory + +from girc import Client + + +class Archiver(object): + # These are known to arrive up to 10s after their actual time + DELAYED_COMMANDS = [ + "JOIN", + "PART", + ] + + COMMANDS = DELAYED_COMMANDS + [ + "PRIVMSG", + "CLEARCHAT", + "CLEARMSG", + "HOSTTARGET", + "NOTICE", + "ROOMSTATE", + "USERNOTICE", + "USERSTATE", + ] + + # How long each batch is + BATCH_INTERVAL = 60 + + # Assume we're never more than this amount of time behind the server time + # Worst case if too low: multiple output files for same batch that need merging later + MAX_SERVER_LAG = 30 + + # When guessing when a non-timestamped event occurred, pad the possible range + # by up to this amount before and after our best guess + ESTIMATED_TIME_PADDING = 5 + + def __init__(self, name, base_dir, channel, nick, oauth_token): + self.logger = logging.getLogger(type(self).__name__).getChild(channel) + self.name = name + self.messages = gevent.queue.Queue() + self.path = os.path.join(base_dir, channel) + + self.stopping = gevent.event.Event() + self.client = Client( + hostname='irc.chat.twitch.tv', + port=6697, + ssl=True, + nick=nick, + password=oauth_token, + twitch=True, + stop_handler=lambda c: self.stopping.set(), + ) + self.client.channel('#{}'.format(channel)).join() + + def run(self): + # wait for twitch to send the initial ROOMSTATE for the room we've joined. + # everything preceeding this message is per-connection stuff we don't care about. + # once we get it, we register the handler to put everything following onto the + # message queue. + @self.client.handler(command='ROOMSTATE', sync=True) + def register_real_handler(client, message): + self.client.handler(lambda c, m: self.messages.put(m), sync=True) + return True + + self.client.start() + + last_server_time = None + last_timestamped_message = None + # {batch time: [messages]} + batches = {} + + while not self.stopping.is_set(): + # wait until we either have a message, are stopping, or a batch can be closed + if batches: + next_batch_close = min(batches.keys()) + self.BATCH_INTERVAL + self.MAX_SERVER_LAG + self.logger.debug("Next batch close at {} (batch times: {})".format(next_batch_close, batches.keys())) + timeout = max(0, next_batch_close - time.time()) + else: + timeout = None + self.logger.debug("Waiting up to {} for message or stop".format(timeout)) + gevent.wait([gevent.spawn(self.messages.peek), self.stopping], count=1, timeout=timeout) + + # close any closable batches + now = time.time() + for batch_time, messages in list(batches.items()): + if now >= batch_time + self.BATCH_INTERVAL + self.MAX_SERVER_LAG: + del batches[batch_time] + self.write_batch(batch_time, messages) + + # consume a message if any + try: + message = self.messages.get(block=False) + except gevent.queue.Empty: + continue + + if message.command not in self.COMMANDS: + self.logger.info("Skipping non-whitelisted command: {}".format(message.command)) + continue + + self.logger.debug("Got message: {}".format(message)) + data = { + attr: getattr(message, attr) + for attr in ('command', 'params', 'sender', 'user', 'host', 'tags') + } + data['receivers'] = {self.name: message.received_at} + self.logger.debug("Got message: {}".format(data)) + + if data['tags'] and 'tmi-sent-ts' in data['tags']: + # explicit server time is available + timestamp = int(data['tags']['tmi-sent-ts']) / 1000. # original is int ms + last_timestamped_message = message + last_server_time = timestamp + time_range = 0 + self.logger.debug("Message has exact timestamp: {}".format(timestamp)) + # check for any non-timestamped messages which we now know must have been + # before this message. We need to check this batch and the previous. + batch_time = int(timestamp / self.BATCH_INTERVAL) * self.BATCH_INTERVAL + for batch in (batch_time, batch_time - self.BATCH_INTERVAL): + for msg in batches.get(batch, []): + time_between = timestamp - msg['time'] + if 0 < time_between < msg['time_range']: + self.logger.debug("Updating previous message {m[command]}@{m[time]} range {m[time_range]} -> {new}".format( + m=msg, new=time_between, + )) + msg['time_range'] = time_between + elif last_server_time is not None: + # estimate current server time based on time since last timestamped message + est_server_time = last_server_time + time.time() - last_timestamped_message.received_at + # pad either side of the estimated server time, use this as a baseline + timestamp = est_server_time - self.ESTIMATED_TIME_PADDING + time_range = 2 * self.ESTIMATED_TIME_PADDING + # if previously timestamped message falls within this range, we know this message + # came after it + timestamp = max(timestamp, last_server_time) + else: + # we have no idea what the server time is, so we guess as 2x the normal padding + # starting from local time. + timestamp = time.time() - 2 * self.ESTIMATED_TIME_PADDING + time_range = 3 * self.ESTIMATED_TIME_PADDING + + if data['command'] in self.DELAYED_COMMANDS: + # might have happened 10s sooner than otherwise indicated. + timestamp -= 10 + time_range += 10 + + self.logger.debug("Message time determined as {} + up to {}".format(timestamp, time_range)) + data['time'] = timestamp + data['time_range'] = time_range + batch_time = int(timestamp / self.BATCH_INTERVAL) * self.BATCH_INTERVAL + batches.setdefault(batch_time, []).append(data) + + # Close any remaining batches + for batch_time, messages in batches.items(): + self.write_batch(batch_time, messages) + + self.client.wait_for_stop() # re-raise any errors + + def write_batch(self, batch_time, messages): + # We need to take some care to have a consistent ordering and format here. + # We use a "canonicalised JSON" format, which is really just whatever the python encoder does, + # with compact separators. + messages = [ + (message, json.dumps(message, separators=(',', ':'))) + for message in messages + ] + # We sort by timestamp, then timestamp range, then if all else fails, lexiographically + # on the encoded representation. + messages.sort(key=lambda item: (item[0]['time'], item[0]['time_range'], item[1])) + output = ("\n".join(line for message, line in messages) + "\n").encode("utf-8") + hash = base64.b64encode(hashlib.sha256(output).digest(), b"-_").decode().rstrip("=") + time = datetime.utcfromtimestamp(batch_time).strftime("%Y-%m-%dT%H:%M:%S") + filename = "{}-{}.json".format(time, hash) + filepath = os.path.join(self.path, filename) + temppath = "{}.{}.temp".format(filepath, uuid4()) + ensure_directory(filepath) + with open(temppath, 'wb') as f: + f.write(output) + os.rename(temppath, filepath) + self.logger.info("Wrote batch {}".format(filepath)) + + def stop(self): + self.client.stop() + + +def main(channel, nick, oauth_token_path, base_dir='/mnt'): + with open(oauth_token_path) as f: + oauth_token = f.read() + name = socket.gethostname() + + archiver = Archiver(name, base_dir, channel, nick, oauth_token) + + gevent.signal_handler(signal.SIGTERM, archiver.stop) + + logging.info("Starting") + archiver.run() + logging.info("Gracefully stopped") diff --git a/chat_archiver/girc b/chat_archiver/girc new file mode 160000 index 0000000..505ec0e --- /dev/null +++ b/chat_archiver/girc @@ -0,0 +1 @@ +Subproject commit 505ec0eb3c0a46adfad62383d9634fe971a430d5 diff --git a/chat_archiver/setup.py b/chat_archiver/setup.py new file mode 100644 index 0000000..80c6483 --- /dev/null +++ b/chat_archiver/setup.py @@ -0,0 +1,14 @@ +from setuptools import setup, find_packages + +setup( + name='chat_archiver', + version='0.0.1', + author='Mike Lang', + author_email='mikelang3000@gmail.com', + description='', + packages=find_packages(), + install_requires=[ + 'argh', + 'gevent', + ], +) diff --git a/docker-compose.jsonnet b/docker-compose.jsonnet index d966d01..c1c4678 100644 --- a/docker-compose.jsonnet +++ b/docker-compose.jsonnet @@ -170,6 +170,14 @@ "youtube-manual", ], + chat_archiver:: { + image: "ghcr.io/ekimekim/wubloader-downloader:chat-archiver-hack-1", + channel: "desertbus", + user: "dbvideostriketeam", + logs_path: "%s/chat_logs" % $.segments_path, + token_path: "./chat_token.txt". + }, + // Extra options to pass via environment variables, // eg. log level, disabling stack sampling. env:: { @@ -441,6 +449,14 @@ [if $.db_standby then "command"]: ["/standby_setup.sh"], }, + [if $.chat_archiver != "null" then "chat_archiver"]: { + local c = $.chat_archiver, + image: c.image, + restart: "always", + command: [c.channel, c.user, "/token"], + volumes: ["%s:/mnt" % c.logs_path, "%s:/token" % c.token_path], + }, + }, }