From 260293d40d75b5f8b95ee6bffe4b89d5f5d43288 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 30 Oct 2023 22:58:06 +1100 Subject: [PATCH] chat_archiver: Allow passing multiple channels on CLI --- chat_archiver/chat_archiver/main.py | 35 ++++++++++++++++------------- docker-compose.jsonnet | 2 +- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/chat_archiver/chat_archiver/main.py b/chat_archiver/chat_archiver/main.py index 56fa21f..52759a8 100644 --- a/chat_archiver/chat_archiver/main.py +++ b/chat_archiver/chat_archiver/main.py @@ -457,7 +457,7 @@ def merge_batch_files(path, batch_time): os.remove(batch_file) -def main(channel, nick, oauth_token_path, base_dir='/mnt', name=None, merge_interval=60, metrics_port=8008): +def main(nick, oauth_token_path, *channels, base_dir='/mnt', name=None, merge_interval=60, metrics_port=8008): with open(oauth_token_path) as f: oauth_token = f.read() # To ensure uniqueness even if multiple instances are running on the same host, @@ -470,24 +470,28 @@ def main(channel, nick, oauth_token_path, base_dir='/mnt', name=None, merge_inte stopping = gevent.event.Event() gevent.signal_handler(signal.SIGTERM, stopping.set) - merger = gevent.spawn(merge_all, - os.path.join(base_dir, channel, "chat"), - interval=merge_interval, - stopping=stopping - ) + mergers = [ + gevent.spawn(merge_all, + os.path.join(base_dir, channel, "chat"), + interval=merge_interval, + stopping=stopping + ) for channel in channels + ] prom.start_http_server(metrics_port) logging.info("Starting") for index in count(): # To ensure uniqueness between clients, include a client number - archiver = Archiver("{}.{}".format(name, index), base_dir, [channel], nick, oauth_token) - worker = gevent.spawn(archiver.run) + archiver = Archiver("{}.{}".format(name, index), base_dir, channels, nick, oauth_token) + archive_worker = gevent.spawn(archiver.run) + workers = mergers + [archive_worker] # wait for either graceful exit, error, or for a signal from the archiver that a reconnect was requested - gevent.wait([stopping, worker, merger, archiver.got_reconnect], count=1) + gevent.wait([stopping, archiver.got_reconnect] + workers, count=1) if stopping.is_set(): archiver.stop() - worker.get() + for worker in workers: + worker.get() break # if got reconnect, discard the old archiver (we don't care even if it fails after this) # and make a new one @@ -497,10 +501,11 @@ def main(channel, nick, oauth_token_path, base_dir='/mnt', name=None, merge_inte # the only remaining case is that something failed. stop everything and re-raise. logging.warning("Stopping due to worker dying") stopping.set() - worker.join() - merger.join() - # at least one of these two should raise - worker.get() - merger.get() + archiver.stop() + for worker in workers: + worker.join() + # at least one of these should raise + for worker in workers: + worker.get() assert False, "Worker unexpectedly exited successfully" logging.info("Gracefully stopped") diff --git a/docker-compose.jsonnet b/docker-compose.jsonnet index 13ee196..84563a9 100644 --- a/docker-compose.jsonnet +++ b/docker-compose.jsonnet @@ -528,7 +528,7 @@ [if $.enabled.chat_archiver then "chat_archiver"]: { image: $.get_image("chat_archiver"), restart: "always", - command: [$.chat_archiver.channel, $.chat_archiver.user, "/token", "--name", $.localhost], + command: [$.chat_archiver.user, "/token", $.chat_archiver.channel, "--name", $.localhost], volumes: ["%s:/mnt" % $.segments_path, "%s:/token" % $.chat_archiver.token_path], [if "chat_archiver" in $.ports then "ports"]: ["%s:8008" % $.ports.chat_archiver], environment: $.env,