|
|
|
@ -457,7 +457,7 @@ def merge_batch_files(path, batch_time):
|
|
|
|
|
os.remove(batch_file)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(channel, nick, oauth_token_path, base_dir='/mnt', name=None, merge_interval=60, metrics_port=8008):
|
|
|
|
|
def main(nick, oauth_token_path, *channels, base_dir='/mnt', name=None, merge_interval=60, metrics_port=8008):
|
|
|
|
|
with open(oauth_token_path) as f:
|
|
|
|
|
oauth_token = f.read()
|
|
|
|
|
# To ensure uniqueness even if multiple instances are running on the same host,
|
|
|
|
@ -470,24 +470,28 @@ def main(channel, nick, oauth_token_path, base_dir='/mnt', name=None, merge_inte
|
|
|
|
|
stopping = gevent.event.Event()
|
|
|
|
|
gevent.signal_handler(signal.SIGTERM, stopping.set)
|
|
|
|
|
|
|
|
|
|
merger = gevent.spawn(merge_all,
|
|
|
|
|
os.path.join(base_dir, channel, "chat"),
|
|
|
|
|
interval=merge_interval,
|
|
|
|
|
stopping=stopping
|
|
|
|
|
)
|
|
|
|
|
mergers = [
|
|
|
|
|
gevent.spawn(merge_all,
|
|
|
|
|
os.path.join(base_dir, channel, "chat"),
|
|
|
|
|
interval=merge_interval,
|
|
|
|
|
stopping=stopping
|
|
|
|
|
) for channel in channels
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
prom.start_http_server(metrics_port)
|
|
|
|
|
|
|
|
|
|
logging.info("Starting")
|
|
|
|
|
for index in count():
|
|
|
|
|
# To ensure uniqueness between clients, include a client number
|
|
|
|
|
archiver = Archiver("{}.{}".format(name, index), base_dir, [channel], nick, oauth_token)
|
|
|
|
|
worker = gevent.spawn(archiver.run)
|
|
|
|
|
archiver = Archiver("{}.{}".format(name, index), base_dir, channels, nick, oauth_token)
|
|
|
|
|
archive_worker = gevent.spawn(archiver.run)
|
|
|
|
|
workers = mergers + [archive_worker]
|
|
|
|
|
# wait for either graceful exit, error, or for a signal from the archiver that a reconnect was requested
|
|
|
|
|
gevent.wait([stopping, worker, merger, archiver.got_reconnect], count=1)
|
|
|
|
|
gevent.wait([stopping, archiver.got_reconnect] + workers, count=1)
|
|
|
|
|
if stopping.is_set():
|
|
|
|
|
archiver.stop()
|
|
|
|
|
worker.get()
|
|
|
|
|
for worker in workers:
|
|
|
|
|
worker.get()
|
|
|
|
|
break
|
|
|
|
|
# if got reconnect, discard the old archiver (we don't care even if it fails after this)
|
|
|
|
|
# and make a new one
|
|
|
|
@ -497,10 +501,11 @@ def main(channel, nick, oauth_token_path, base_dir='/mnt', name=None, merge_inte
|
|
|
|
|
# the only remaining case is that something failed. stop everything and re-raise.
|
|
|
|
|
logging.warning("Stopping due to worker dying")
|
|
|
|
|
stopping.set()
|
|
|
|
|
worker.join()
|
|
|
|
|
merger.join()
|
|
|
|
|
# at least one of these two should raise
|
|
|
|
|
worker.get()
|
|
|
|
|
merger.get()
|
|
|
|
|
archiver.stop()
|
|
|
|
|
for worker in workers:
|
|
|
|
|
worker.join()
|
|
|
|
|
# at least one of these should raise
|
|
|
|
|
for worker in workers:
|
|
|
|
|
worker.get()
|
|
|
|
|
assert False, "Worker unexpectedly exited successfully"
|
|
|
|
|
logging.info("Gracefully stopped")
|
|
|
|
|