chat-archiver: Early work and basic archival

pull/300/head
Mike Lang 3 years ago committed by Mike Lang
parent 6b9d8ab785
commit 0756539b85

3
.gitmodules vendored

@ -0,0 +1,3 @@
[submodule "chat_archiver/girc"]
path = chat_archiver/girc
url = https://github.com/ekimekim/girc

@ -0,0 +1,6 @@
FROM quay.io/ekimekim/wubloader-downloader:32138bb
COPY girc /tmp/girc
RUN pip install /tmp/girc && rm -r /tmp/girc
COPY . /tmp/archiver
RUN pip install /tmp/archiver && rm -r /tmp/archiver
ENTRYPOINT ["python", "-m", "chat_archiver", "--base-dir", "/mnt"]

@ -0,0 +1,53 @@
Chat archiver records messages from TMI (the twitch messaging system)
in a way that preserves as much context as possible, and allows multiple independently-recorded
streams to be combined to ensure nothing was missed.
We store messages in newline-delimited JSON files, in timestamp order.
Each file covers one minute of messages.
These files are named by their timestamp + hash and merged with other files via a CRDT model.
CRDT means you have a merge operation (.) such that
(A.B).C == A.(B.C) (associative)
A.B == B.A (commutitive)
A.A == A (reflexive)
We have a few different kinds of messages to deal with:
Messages with a unique id and timestamp
eg. PRIVMSG, USERNOTICE.
These are easy, as they have a canonical timestamp and can be trivially deduplicated by id.
Messages with an implied ordering
eg. ROOMSTATE, NOTICE, CLEARCHAT.
These messages arrive in what we assume is a consistent order on all clients,
but have no direct associated timestamp. We thus set a timestamp *range* for when
the message could have occurred from the server's perspective, between the last known
server timestamp (since it must be after that message) and the next received server timestamp
(since it must be before that message). We can set some reasonable timeout here in case
we don't receive another message within a short time window.
Messages with a timestamp range are ordered by their timestamp start.
This also governs which file they are in if their range overlaps two files.
Messages known to be out of order
This is specific to JOINs and PARTs.
Twitch documents that these may be delayed by up to 10sec.
So we follow the rules as per messages with implied ordering,
except we pad the possible start time by 10 seconds.
How to merge two files
In general, if the same message (all non-receiver fields identical) is present in both files,
it is included once in the output. For messages with unique ids, this is all that's needed.
For messages without unique ids, we face the question of "is this the same message".
All the following must be true:
* All non-timestamp, non-receiver fields match
* Timestamp ranges overlap
If a message may match multiple messages on the other side with these rules, then
we pick one arbitrarily.
We then merge these messages, setting the timestamp range to the intersection of the inputs.
Literal edge cases: Timestamp ranges that span two files
It may be the case that we can match a message whose timestamp range overhangs file A
with a message near the start of file B. So whenever we are merging files, we need to
also consider the file before and the file after on our side.
In all cases when we merge two messages, we should merge the receiver timestamp field which maps
the receiver id to the timestamp it received the message. This preserves message provedence.
All files are stored in newline-delimited, canonicalised JSON so we can use hashes to compare them.
It should always be true that merging B into A and merging A into B should produce identical files
with the same hash (effects of neighboring files notwithstanding - that just means more merges will
be needed in order to stabilize).

@ -0,0 +1,15 @@
import gevent.monkey
gevent.monkey.patch_all()
import logging
import os
import argh
from .main import main
LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s"
level = os.environ.get('WUBLOADER_LOG_LEVEL', 'INFO').upper()
logging.basicConfig(level=level, format=LOG_FORMAT)
argh.dispatch_command(main)

@ -0,0 +1,209 @@
import base64
import hashlib
import json
import logging
import os
import signal
import socket
import time
from datetime import datetime
from uuid import uuid4
import gevent.event
import gevent.queue
from common import ensure_directory
from girc import Client
class Archiver(object):
# These are known to arrive up to 10s after their actual time
DELAYED_COMMANDS = [
"JOIN",
"PART",
]
COMMANDS = DELAYED_COMMANDS + [
"PRIVMSG",
"CLEARCHAT",
"CLEARMSG",
"HOSTTARGET",
"NOTICE",
"ROOMSTATE",
"USERNOTICE",
"USERSTATE",
]
# How long each batch is
BATCH_INTERVAL = 60
# Assume we're never more than this amount of time behind the server time
# Worst case if too low: multiple output files for same batch that need merging later
MAX_SERVER_LAG = 30
# When guessing when a non-timestamped event occurred, pad the possible range
# by up to this amount before and after our best guess
ESTIMATED_TIME_PADDING = 5
def __init__(self, name, base_dir, channel, nick, oauth_token):
self.logger = logging.getLogger(type(self).__name__).getChild(channel)
self.name = name
self.messages = gevent.queue.Queue()
self.path = os.path.join(base_dir, channel)
self.stopping = gevent.event.Event()
self.client = Client(
hostname='irc.chat.twitch.tv',
port=6697,
ssl=True,
nick=nick,
password=oauth_token,
twitch=True,
stop_handler=lambda c: self.stopping.set(),
)
self.client.channel('#{}'.format(channel)).join()
def run(self):
# wait for twitch to send the initial ROOMSTATE for the room we've joined.
# everything preceeding this message is per-connection stuff we don't care about.
# once we get it, we register the handler to put everything following onto the
# message queue.
@self.client.handler(command='ROOMSTATE', sync=True)
def register_real_handler(client, message):
self.client.handler(lambda c, m: self.messages.put(m), sync=True)
return True
self.client.start()
last_server_time = None
last_timestamped_message = None
# {batch time: [messages]}
batches = {}
while not self.stopping.is_set():
# wait until we either have a message, are stopping, or a batch can be closed
if batches:
next_batch_close = min(batches.keys()) + self.BATCH_INTERVAL + self.MAX_SERVER_LAG
self.logger.debug("Next batch close at {} (batch times: {})".format(next_batch_close, batches.keys()))
timeout = max(0, next_batch_close - time.time())
else:
timeout = None
self.logger.debug("Waiting up to {} for message or stop".format(timeout))
gevent.wait([gevent.spawn(self.messages.peek), self.stopping], count=1, timeout=timeout)
# close any closable batches
now = time.time()
for batch_time, messages in list(batches.items()):
if now >= batch_time + self.BATCH_INTERVAL + self.MAX_SERVER_LAG:
del batches[batch_time]
self.write_batch(batch_time, messages)
# consume a message if any
try:
message = self.messages.get(block=False)
except gevent.queue.Empty:
continue
if message.command not in self.COMMANDS:
self.logger.info("Skipping non-whitelisted command: {}".format(message.command))
continue
self.logger.debug("Got message: {}".format(message))
data = {
attr: getattr(message, attr)
for attr in ('command', 'params', 'sender', 'user', 'host', 'tags')
}
data['receivers'] = {self.name: message.received_at}
self.logger.debug("Got message: {}".format(data))
if data['tags'] and 'tmi-sent-ts' in data['tags']:
# explicit server time is available
timestamp = int(data['tags']['tmi-sent-ts']) / 1000. # original is int ms
last_timestamped_message = message
last_server_time = timestamp
time_range = 0
self.logger.debug("Message has exact timestamp: {}".format(timestamp))
# check for any non-timestamped messages which we now know must have been
# before this message. We need to check this batch and the previous.
batch_time = int(timestamp / self.BATCH_INTERVAL) * self.BATCH_INTERVAL
for batch in (batch_time, batch_time - self.BATCH_INTERVAL):
for msg in batches.get(batch, []):
time_between = timestamp - msg['time']
if 0 < time_between < msg['time_range']:
self.logger.debug("Updating previous message {m[command]}@{m[time]} range {m[time_range]} -> {new}".format(
m=msg, new=time_between,
))
msg['time_range'] = time_between
elif last_server_time is not None:
# estimate current server time based on time since last timestamped message
est_server_time = last_server_time + time.time() - last_timestamped_message.received_at
# pad either side of the estimated server time, use this as a baseline
timestamp = est_server_time - self.ESTIMATED_TIME_PADDING
time_range = 2 * self.ESTIMATED_TIME_PADDING
# if previously timestamped message falls within this range, we know this message
# came after it
timestamp = max(timestamp, last_server_time)
else:
# we have no idea what the server time is, so we guess as 2x the normal padding
# starting from local time.
timestamp = time.time() - 2 * self.ESTIMATED_TIME_PADDING
time_range = 3 * self.ESTIMATED_TIME_PADDING
if data['command'] in self.DELAYED_COMMANDS:
# might have happened 10s sooner than otherwise indicated.
timestamp -= 10
time_range += 10
self.logger.debug("Message time determined as {} + up to {}".format(timestamp, time_range))
data['time'] = timestamp
data['time_range'] = time_range
batch_time = int(timestamp / self.BATCH_INTERVAL) * self.BATCH_INTERVAL
batches.setdefault(batch_time, []).append(data)
# Close any remaining batches
for batch_time, messages in batches.items():
self.write_batch(batch_time, messages)
self.client.wait_for_stop() # re-raise any errors
def write_batch(self, batch_time, messages):
# We need to take some care to have a consistent ordering and format here.
# We use a "canonicalised JSON" format, which is really just whatever the python encoder does,
# with compact separators.
messages = [
(message, json.dumps(message, separators=(',', ':')))
for message in messages
]
# We sort by timestamp, then timestamp range, then if all else fails, lexiographically
# on the encoded representation.
messages.sort(key=lambda item: (item[0]['time'], item[0]['time_range'], item[1]))
output = ("\n".join(line for message, line in messages) + "\n").encode("utf-8")
hash = base64.b64encode(hashlib.sha256(output).digest(), b"-_").decode().rstrip("=")
time = datetime.utcfromtimestamp(batch_time).strftime("%Y-%m-%dT%H:%M:%S")
filename = "{}-{}.json".format(time, hash)
filepath = os.path.join(self.path, filename)
temppath = "{}.{}.temp".format(filepath, uuid4())
ensure_directory(filepath)
with open(temppath, 'wb') as f:
f.write(output)
os.rename(temppath, filepath)
self.logger.info("Wrote batch {}".format(filepath))
def stop(self):
self.client.stop()
def main(channel, nick, oauth_token_path, base_dir='/mnt'):
with open(oauth_token_path) as f:
oauth_token = f.read()
name = socket.gethostname()
archiver = Archiver(name, base_dir, channel, nick, oauth_token)
gevent.signal_handler(signal.SIGTERM, archiver.stop)
logging.info("Starting")
archiver.run()
logging.info("Gracefully stopped")

@ -0,0 +1 @@
Subproject commit 505ec0eb3c0a46adfad62383d9634fe971a430d5

@ -0,0 +1,14 @@
from setuptools import setup, find_packages
setup(
name='chat_archiver',
version='0.0.1',
author='Mike Lang',
author_email='mikelang3000@gmail.com',
description='',
packages=find_packages(),
install_requires=[
'argh',
'gevent',
],
)

@ -170,6 +170,14 @@
"youtube-manual", "youtube-manual",
], ],
chat_archiver:: {
image: "ghcr.io/ekimekim/wubloader-downloader:chat-archiver-hack-1",
channel: "desertbus",
user: "dbvideostriketeam",
logs_path: "%s/chat_logs" % $.segments_path,
token_path: "./chat_token.txt".
},
// Extra options to pass via environment variables, // Extra options to pass via environment variables,
// eg. log level, disabling stack sampling. // eg. log level, disabling stack sampling.
env:: { env:: {
@ -441,6 +449,14 @@
[if $.db_standby then "command"]: ["/standby_setup.sh"], [if $.db_standby then "command"]: ["/standby_setup.sh"],
}, },
[if $.chat_archiver != "null" then "chat_archiver"]: {
local c = $.chat_archiver,
image: c.image,
restart: "always",
command: [c.channel, c.user, "/token"],
volumes: ["%s:/mnt" % c.logs_path, "%s:/token" % c.token_path],
},
}, },
} }

Loading…
Cancel
Save