chat_archiver: Add code support for archiving multiple channels at once

* Join every channel on connect
* Move the "wait for initial ROOMSTATE" logic into the main loop and make it per-channel
* Make batch keys (channel, time) instead of just time

For now the CLI doesn't actually allow you to run chat_archiver in this mode,
it always calls the implementation with a 1-element list of channels.
pull/353/head
Mike Lang 1 year ago committed by Mike Lang
parent b050b71036
commit 200d2df9ba

@ -62,8 +62,8 @@ messages_received = prom.Counter(
messages_ignored = prom.Counter(
"messages_ignored",
"Number of chat messages that were recieved but ignored due to not being on the allowlist.",
["channel", "client", "command"],
"Number of chat messages that were recieved but ignored for some reason (see reason label)",
["client", "command", "reason"],
)
messages_written = prom.Counter(
@ -112,13 +112,12 @@ merge_pass_merges = prom.Histogram(
)
class Archiver(object):
def __init__(self, name, base_dir, channel, nick, oauth_token):
def __init__(self, name, base_dir, channels, nick, oauth_token):
self.logger = logging.getLogger(type(self).__name__).getChild(name)
self.name = name
self.messages = gevent.queue.Queue()
self.channel = channel
self.channels = channels
self.base_dir = base_dir
self.path = os.path.join(base_dir, channel, "chat")
self.stopping = gevent.event.Event()
self.got_reconnect = gevent.event.Event()
@ -131,33 +130,31 @@ class Archiver(object):
twitch=True,
stop_handler=lambda c: self.stopping.set(),
)
for channel in self.channels:
self.client.channel('#{}'.format(channel)).join()
def write_batch(self, batch_time, messages):
def channel_path(self, channel):
return os.path.join(self.base_dir, channel, "chat")
def write_batch(self, channel, batch_time, messages):
# wrapper around general write_batch() function
write_batch(
self.path, batch_time, messages,
size_histogram=batch_bytes.labels(channel=self.channel, client=self.name),
self.channel_path(channel), batch_time, messages,
size_histogram=batch_bytes.labels(channel=channel, client=self.name),
)
batch_messages.labels(channel=self.channel, client=self.name).observe(len(messages))
batch_messages.labels(channel=channel, client=self.name).observe(len(messages))
# incrementing a prom counter can be stupidly expensive, collect up per-command values
# so we can do them in one go
by_command = defaultdict(lambda: 0)
for message in messages:
by_command[message["command"]] += 1
for command, count in by_command.items():
messages_written.labels(channel=self.channel, client=self.name, command=command).inc(count)
messages_written.labels(channel=channel, client=self.name, command=command).inc(count)
def run(self):
# wait for twitch to send the initial ROOMSTATE for the room we've joined.
# everything preceeding this message is per-connection stuff we don't care about.
# once we get it, we register the handler to put everything following onto the
# message queue.
@self.client.handler(command='ROOMSTATE', sync=True)
def register_real_handler(client, message):
self.client.handler(lambda c, m: self.messages.put(m), sync=True)
self.logger.info("Client started")
return True
@self.client.handler(sync=True)
def handle_message(client, message):
self.messages.put(message)
# Twitch sends a RECONNECT shortly before terminating the connection from the server side.
# This gives us time to start up a new instance of the archiver while keeping this one
@ -171,15 +168,27 @@ class Archiver(object):
last_server_time = None
last_timestamped_message = None
# {batch time: [messages]}
# {(channel, batch time): [messages]}
batches = {}
open_batches.labels(channel=self.channel, client=id(self)).set_function(lambda: len(batches))
for channel in self.channels:
open_batches.labels(channel=channel, client=self.name).set_function(
lambda: len(1 for c, t in batches if c == channel)
)
# Tracks if we've seen the initial ROOMSTATE for each channel we've joined.
# Everything up to and including this message is per-connection:
# - a JOIN for us joining the room (even if we were already there on another connection)
# - a USERSTATE for our user
# - a ROOMSTATE for the room
# We ignore all messages before the initial ROOMSTATE.
initialized_channels = set()
while not (self.stopping.is_set() and self.messages.empty()):
# wait until we either have a message, are stopping, or a batch can be closed
if batches:
next_batch_close = min(batches.keys()) + BATCH_INTERVAL + MAX_SERVER_LAG
self.logger.debug("Next batch close at {} (batch times: {})".format(next_batch_close, batches.keys()))
oldest_batch_time = min(batch_time for channel, batch_time in batches.keys())
next_batch_close = oldest_batch_time + BATCH_INTERVAL + MAX_SERVER_LAG
self.logger.debug("Next batch close at {} (batch times: {})".format(next_batch_close, list(batches.keys())))
timeout = max(0, next_batch_close - time.time())
else:
timeout = None
@ -188,10 +197,10 @@ class Archiver(object):
# close any closable batches
now = time.time()
for batch_time, messages in list(batches.items()):
for (channel, batch_time), messages in list(batches.items()):
if now >= batch_time + BATCH_INTERVAL + MAX_SERVER_LAG:
del batches[batch_time]
self.write_batch(batch_time, messages)
del batches[channel, batch_time]
self.write_batch(channel, batch_time, messages)
# consume a message if any
try:
@ -203,7 +212,28 @@ class Archiver(object):
if message.command not in COMMANDS:
self.logger.info("Skipping non-whitelisted command: {}".format(message.command))
messages_ignored.labels(channel=self.channel, client=self.name, command=message.command).inc()
messages_ignored.labels(client=self.name, command=message.command, reason="non-whitelisted").inc()
continue
# For all message types we capture, the channel name is always the first param.
if not message.params:
self.logger.error(f"Skipping malformed message with no params - cannot determine channel: {message}")
messages_ignored.labels(client=self.name, command=message.command, reason="no-channel").inc()
continue
channel = message.params[0].lstrip("#")
if channel not in self.channels:
self.logger.error(f"Skipping unexpected message for unrequested channel {channel}")
messages_ignored.labels(client=self.name, command=message.command, reason="bad-channel").inc()
continue
if channel not in initialized_channels:
self.logger.debug(f"Skipping {message.command} message on non-initialized channel {channel}")
if message.command == "ROOMSTATE":
initialized_channels.add(channel)
self.logger.info(f"Channel {channel} is ready")
messages_ignored.labels(client=self.name, command=message.command, reason="non-initialized-channel").inc()
continue
data = {
@ -212,7 +242,7 @@ class Archiver(object):
}
data['receivers'] = {self.name: message.received_at}
self.logger.debug("Got message data: {}".format(data))
messages_received.labels(channel=self.channel, client=self.name, command=message.command).inc()
messages_received.labels(channel=channel, client=self.name, command=message.command).inc()
if data['tags'] and data['tags'].get('emotes', '') != '':
emote_specs = data['tags']['emotes'].split('/')
@ -224,14 +254,14 @@ class Archiver(object):
timestamp = int(data['tags']['tmi-sent-ts']) / 1000. # original is int ms
last_timestamped_message = message
last_server_time = timestamp
server_lag.labels(channel=self.channel, client=self.name).set(time.time() - timestamp)
server_lag.labels(channel=channel, client=self.name).set(time.time() - timestamp)
time_range = 0
self.logger.debug("Message has exact timestamp: {}".format(timestamp))
# check for any non-timestamped messages which we now know must have been
# before this message. We need to check this batch and the previous.
batch_time = int(timestamp / BATCH_INTERVAL) * BATCH_INTERVAL
for batch in (batch_time, batch_time - BATCH_INTERVAL):
for msg in batches.get(batch, []):
for msg in batches.get((channel, batch), []):
time_between = timestamp - msg['time']
if 0 < time_between < msg['time_range']:
self.logger.debug("Updating previous message {m[command]}@{m[time]} range {m[time_range]} -> {new}".format(
@ -262,11 +292,11 @@ class Archiver(object):
data['time'] = timestamp
data['time_range'] = time_range
batch_time = int(timestamp / BATCH_INTERVAL) * BATCH_INTERVAL
batches.setdefault(batch_time, []).append(data)
batches.setdefault((channel, batch_time), []).append(data)
# Close any remaining batches
for batch_time, messages in batches.items():
self.write_batch(batch_time, messages)
for (channel, batch_time), messages in batches.items():
self.write_batch(channel, batch_time, messages)
self.client.wait_for_stop() # re-raise any errors
self.logger.info("Client stopped")
@ -450,7 +480,7 @@ def main(channel, nick, oauth_token_path, base_dir='/mnt', name=None, merge_inte
logging.info("Starting")
for index in count():
# To ensure uniqueness between clients, include a client number
archiver = Archiver("{}.{}".format(name, index), base_dir, channel, nick, oauth_token)
archiver = Archiver("{}.{}".format(name, index), base_dir, [channel], nick, oauth_token)
worker = gevent.spawn(archiver.run)
# wait for either graceful exit, error, or for a signal from the archiver that a reconnect was requested
gevent.wait([stopping, worker, merger, archiver.got_reconnect], count=1)

Loading…
Cancel
Save