diff --git a/chat_archiver/chat_archiver/main.py b/chat_archiver/chat_archiver/main.py index a40f81c..291d343 100644 --- a/chat_archiver/chat_archiver/main.py +++ b/chat_archiver/chat_archiver/main.py @@ -1,5 +1,4 @@ -from calendar import timegm import base64 import hashlib import json @@ -8,7 +7,9 @@ import os import signal import socket import time +from calendar import timegm from datetime import datetime +from itertools import count from uuid import uuid4 import gevent.event @@ -55,6 +56,7 @@ class Archiver(object): self.path = os.path.join(base_dir, channel) self.stopping = gevent.event.Event() + self.got_reconnect = gevent.event.Event() self.client = Client( hostname='irc.chat.twitch.tv', port=6697, @@ -74,8 +76,17 @@ class Archiver(object): @self.client.handler(command='ROOMSTATE', sync=True) def register_real_handler(client, message): self.client.handler(lambda c, m: self.messages.put(m), sync=True) + self.logger.info("Client started") return True + # Twitch sends a RECONNECT shortly before terminating the connection from the server side. + # This gives us time to start up a new instance of the archiver while keeping this one + # running, so that we can be sure we don't miss anything. This will cause duplicate batches, + # but those will get merged later. + @self.client.handler(command='RECONNECT') + def handle_reconnect(client, message): + self.got_reconnect.set() + self.client.start() last_server_time = None @@ -83,7 +94,7 @@ class Archiver(object): # {batch time: [messages]} batches = {} - while not self.stopping.is_set(): + while not (self.stopping.is_set() and self.messages.empty()): # wait until we either have a message, are stopping, or a batch can be closed if batches: next_batch_close = min(batches.keys()) + BATCH_INTERVAL + MAX_SERVER_LAG @@ -168,6 +179,7 @@ class Archiver(object): write_batch(self.path, batch_time, messages) self.client.wait_for_stop() # re-raise any errors + self.logger.info("Client stopped") def stop(self): self.client.stop() @@ -376,12 +388,30 @@ def merge_messages(left, right): def main(channel, nick, oauth_token_path, base_dir='/mnt'): with open(oauth_token_path) as f: oauth_token = f.read() - name = socket.gethostname() + # To ensure uniqueness even if multiple instances are running on the same host, + # also include our pid + name = "{}.{}".format(socket.gethostname(), os.getpid()) - archiver = Archiver(name, base_dir, channel, nick, oauth_token) - - gevent.signal_handler(signal.SIGTERM, archiver.stop) + stopping = gevent.event.Event() + gevent.signal_handler(signal.SIGTERM, stopping.set) logging.info("Starting") - archiver.run() + for index in count(): + # To ensure uniqueness between clients, include a client number + archiver = Archiver("{}.{}".format(name, index), base_dir, channel, nick, oauth_token) + worker = gevent.spawn(archiver.run) + # wait for either graceful exit, error, or for a signal from the archiver that a reconnect was requested + gevent.wait([stopping, worker, archiver.got_reconnect], count=1) + if stopping.is_set(): + archiver.stop() + worker.get() + break + # if got reconnect, discard the old archiver (we don't care even if it fails after this) + # and make a new one + if archiver.got_reconnect.is_set(): + logging.info("Got RECONNECT, creating new client while waiting for old one to finish") + continue + # the only remaining case is that the worker failed, re-raise + worker.get() + assert False, "Not stopping, but worker exited successfully" logging.info("Gracefully stopped")