diff --git a/chat_archiver/chat_archiver/main.py b/chat_archiver/chat_archiver/main.py index 9604dc9..43dbf31 100644 --- a/chat_archiver/chat_archiver/main.py +++ b/chat_archiver/chat_archiver/main.py @@ -62,8 +62,8 @@ messages_received = prom.Counter( messages_ignored = prom.Counter( "messages_ignored", - "Number of chat messages that were recieved but ignored due to not being on the allowlist.", - ["channel", "client", "command"], + "Number of chat messages that were recieved but ignored for some reason (see reason label)", + ["client", "command", "reason"], ) messages_written = prom.Counter( @@ -112,13 +112,12 @@ merge_pass_merges = prom.Histogram( ) class Archiver(object): - def __init__(self, name, base_dir, channel, nick, oauth_token): + def __init__(self, name, base_dir, channels, nick, oauth_token): self.logger = logging.getLogger(type(self).__name__).getChild(name) self.name = name self.messages = gevent.queue.Queue() - self.channel = channel + self.channels = channels self.base_dir = base_dir - self.path = os.path.join(base_dir, channel, "chat") self.stopping = gevent.event.Event() self.got_reconnect = gevent.event.Event() @@ -131,33 +130,31 @@ class Archiver(object): twitch=True, stop_handler=lambda c: self.stopping.set(), ) - self.client.channel('#{}'.format(channel)).join() + for channel in self.channels: + self.client.channel('#{}'.format(channel)).join() + + def channel_path(self, channel): + return os.path.join(self.base_dir, channel, "chat") - def write_batch(self, batch_time, messages): + def write_batch(self, channel, batch_time, messages): # wrapper around general write_batch() function write_batch( - self.path, batch_time, messages, - size_histogram=batch_bytes.labels(channel=self.channel, client=self.name), + self.channel_path(channel), batch_time, messages, + size_histogram=batch_bytes.labels(channel=channel, client=self.name), ) - batch_messages.labels(channel=self.channel, client=self.name).observe(len(messages)) + batch_messages.labels(channel=channel, client=self.name).observe(len(messages)) # incrementing a prom counter can be stupidly expensive, collect up per-command values # so we can do them in one go by_command = defaultdict(lambda: 0) for message in messages: by_command[message["command"]] += 1 for command, count in by_command.items(): - messages_written.labels(channel=self.channel, client=self.name, command=command).inc(count) + messages_written.labels(channel=channel, client=self.name, command=command).inc(count) def run(self): - # wait for twitch to send the initial ROOMSTATE for the room we've joined. - # everything preceeding this message is per-connection stuff we don't care about. - # once we get it, we register the handler to put everything following onto the - # message queue. - @self.client.handler(command='ROOMSTATE', sync=True) - def register_real_handler(client, message): - self.client.handler(lambda c, m: self.messages.put(m), sync=True) - self.logger.info("Client started") - return True + @self.client.handler(sync=True) + def handle_message(client, message): + self.messages.put(message) # Twitch sends a RECONNECT shortly before terminating the connection from the server side. # This gives us time to start up a new instance of the archiver while keeping this one @@ -171,15 +168,27 @@ class Archiver(object): last_server_time = None last_timestamped_message = None - # {batch time: [messages]} + # {(channel, batch time): [messages]} batches = {} - open_batches.labels(channel=self.channel, client=id(self)).set_function(lambda: len(batches)) + for channel in self.channels: + open_batches.labels(channel=channel, client=self.name).set_function( + lambda: len(1 for c, t in batches if c == channel) + ) + + # Tracks if we've seen the initial ROOMSTATE for each channel we've joined. + # Everything up to and including this message is per-connection: + # - a JOIN for us joining the room (even if we were already there on another connection) + # - a USERSTATE for our user + # - a ROOMSTATE for the room + # We ignore all messages before the initial ROOMSTATE. + initialized_channels = set() while not (self.stopping.is_set() and self.messages.empty()): # wait until we either have a message, are stopping, or a batch can be closed if batches: - next_batch_close = min(batches.keys()) + BATCH_INTERVAL + MAX_SERVER_LAG - self.logger.debug("Next batch close at {} (batch times: {})".format(next_batch_close, batches.keys())) + oldest_batch_time = min(batch_time for channel, batch_time in batches.keys()) + next_batch_close = oldest_batch_time + BATCH_INTERVAL + MAX_SERVER_LAG + self.logger.debug("Next batch close at {} (batch times: {})".format(next_batch_close, list(batches.keys()))) timeout = max(0, next_batch_close - time.time()) else: timeout = None @@ -188,10 +197,10 @@ class Archiver(object): # close any closable batches now = time.time() - for batch_time, messages in list(batches.items()): + for (channel, batch_time), messages in list(batches.items()): if now >= batch_time + BATCH_INTERVAL + MAX_SERVER_LAG: - del batches[batch_time] - self.write_batch(batch_time, messages) + del batches[channel, batch_time] + self.write_batch(channel, batch_time, messages) # consume a message if any try: @@ -203,7 +212,28 @@ class Archiver(object): if message.command not in COMMANDS: self.logger.info("Skipping non-whitelisted command: {}".format(message.command)) - messages_ignored.labels(channel=self.channel, client=self.name, command=message.command).inc() + messages_ignored.labels(client=self.name, command=message.command, reason="non-whitelisted").inc() + continue + + # For all message types we capture, the channel name is always the first param. + if not message.params: + self.logger.error(f"Skipping malformed message with no params - cannot determine channel: {message}") + messages_ignored.labels(client=self.name, command=message.command, reason="no-channel").inc() + continue + + channel = message.params[0].lstrip("#") + + if channel not in self.channels: + self.logger.error(f"Skipping unexpected message for unrequested channel {channel}") + messages_ignored.labels(client=self.name, command=message.command, reason="bad-channel").inc() + continue + + if channel not in initialized_channels: + self.logger.debug(f"Skipping {message.command} message on non-initialized channel {channel}") + if message.command == "ROOMSTATE": + initialized_channels.add(channel) + self.logger.info(f"Channel {channel} is ready") + messages_ignored.labels(client=self.name, command=message.command, reason="non-initialized-channel").inc() continue data = { @@ -212,7 +242,7 @@ class Archiver(object): } data['receivers'] = {self.name: message.received_at} self.logger.debug("Got message data: {}".format(data)) - messages_received.labels(channel=self.channel, client=self.name, command=message.command).inc() + messages_received.labels(channel=channel, client=self.name, command=message.command).inc() if data['tags'] and data['tags'].get('emotes', '') != '': emote_specs = data['tags']['emotes'].split('/') @@ -224,14 +254,14 @@ class Archiver(object): timestamp = int(data['tags']['tmi-sent-ts']) / 1000. # original is int ms last_timestamped_message = message last_server_time = timestamp - server_lag.labels(channel=self.channel, client=self.name).set(time.time() - timestamp) + server_lag.labels(channel=channel, client=self.name).set(time.time() - timestamp) time_range = 0 self.logger.debug("Message has exact timestamp: {}".format(timestamp)) # check for any non-timestamped messages which we now know must have been # before this message. We need to check this batch and the previous. batch_time = int(timestamp / BATCH_INTERVAL) * BATCH_INTERVAL for batch in (batch_time, batch_time - BATCH_INTERVAL): - for msg in batches.get(batch, []): + for msg in batches.get((channel, batch), []): time_between = timestamp - msg['time'] if 0 < time_between < msg['time_range']: self.logger.debug("Updating previous message {m[command]}@{m[time]} range {m[time_range]} -> {new}".format( @@ -262,11 +292,11 @@ class Archiver(object): data['time'] = timestamp data['time_range'] = time_range batch_time = int(timestamp / BATCH_INTERVAL) * BATCH_INTERVAL - batches.setdefault(batch_time, []).append(data) + batches.setdefault((channel, batch_time), []).append(data) # Close any remaining batches - for batch_time, messages in batches.items(): - self.write_batch(batch_time, messages) + for (channel, batch_time), messages in batches.items(): + self.write_batch(channel, batch_time, messages) self.client.wait_for_stop() # re-raise any errors self.logger.info("Client stopped") @@ -450,7 +480,7 @@ def main(channel, nick, oauth_token_path, base_dir='/mnt', name=None, merge_inte logging.info("Starting") for index in count(): # To ensure uniqueness between clients, include a client number - archiver = Archiver("{}.{}".format(name, index), base_dir, channel, nick, oauth_token) + archiver = Archiver("{}.{}".format(name, index), base_dir, [channel], nick, oauth_token) worker = gevent.spawn(archiver.run) # wait for either graceful exit, error, or for a signal from the archiver that a reconnect was requested gevent.wait([stopping, worker, merger, archiver.got_reconnect], count=1)