chat-archiver: Merge all files every minute

pull/300/head
Mike Lang 2 years ago committed by Mike Lang
parent 4cfc362f76
commit c25d79e7c2

@ -19,6 +19,7 @@ import gevent.queue
from common import ensure_directory
from girc import Client
from monotonic import monotonic
# How long each batch is
BATCH_INTERVAL = 60
@ -239,25 +240,44 @@ def get_batch_files(path, batch_time):
]
def merge_all(path):
def merge_all(path, interval=None, stopping=None):
"""Repeatedly scans the batch directory for batch files with the same batch time, and merges them.
Returns once it finds no duplicate files."""
while True:
by_time = {}
for hour in os.listdir(path):
for name in os.listdir(os.path.join(path, hour)):
if not name.endswith(".json"):
continue
time = "-".join(name.split("-")[:3])
timestamp = "{}:{}".format(hour, time)
by_time[timestamp] = by_time.get(timestamp, 0) + 1
if not any(count > 1 for timestamp, count in by_time.items()):
By default, returns once it finds no duplicate files.
If interval is given, re-scans after that number of seconds.
If a gevent.event.Event() is passed in as stopping arg, returns when that event is set.
"""
if stopping is None:
# nothing will ever set this, but it's easier to not special-case it everywhere
stopping = gevent.event.Event()
while not stopping.is_set():
start = monotonic()
# loop until no changes
while True:
logging.debug("Scanning for merges")
by_time = {}
for hour in os.listdir(path):
for name in os.listdir(os.path.join(path, hour)):
if not name.endswith(".json"):
continue
time = "-".join(name.split("-")[:3])
timestamp = "{}:{}".format(hour, time)
by_time[timestamp] = by_time.get(timestamp, 0) + 1
if not any(count > 1 for timestamp, count in by_time.items()):
logging.info("All batches are merged")
break
for timestamp, count in by_time.items():
if count > 1:
logging.info("Merging {} batches at time {}".format(count, timestamp))
batch_time = timegm(time.strptime(timestamp, "%Y-%m-%dT%H:%M:%S"))
merge_batch_files(path, batch_time)
if interval is None:
# one-shot
break
for timestamp, count in by_time.items():
if count > 1:
logging.info("Merging {} batches at time {}".format(count, timestamp))
batch_time = timegm(time.strptime(timestamp, "%Y-%m-%dT%H:%M:%S"))
merge_batch_files(path, batch_time)
next_run = start + interval
remaining = next_run - monotonic()
if remaining > 0:
logging.debug("Waiting {}s for next merge scan".format(remaining))
stopping.wait(remaining)
def merge_batch_files(path, batch_time):
@ -399,7 +419,7 @@ def merge_messages(left, right):
return result
def main(channel, nick, oauth_token_path, base_dir='/mnt', name=None):
def main(channel, nick, oauth_token_path, base_dir='/mnt', name=None, merge_interval=60):
with open(oauth_token_path) as f:
oauth_token = f.read()
# To ensure uniqueness even if multiple instances are running on the same host,
@ -411,13 +431,19 @@ def main(channel, nick, oauth_token_path, base_dir='/mnt', name=None):
stopping = gevent.event.Event()
gevent.signal_handler(signal.SIGTERM, stopping.set)
merger = gevent.spawn(merge_all,
os.path.join(base_dir, channel, "chat"),
interval=merge_interval,
stopping=stopping
)
logging.info("Starting")
for index in count():
# To ensure uniqueness between clients, include a client number
archiver = Archiver("{}.{}".format(name, index), base_dir, channel, nick, oauth_token)
worker = gevent.spawn(archiver.run)
# wait for either graceful exit, error, or for a signal from the archiver that a reconnect was requested
gevent.wait([stopping, worker, archiver.got_reconnect], count=1)
gevent.wait([stopping, worker, merger, archiver.got_reconnect], count=1)
if stopping.is_set():
archiver.stop()
worker.get()
@ -427,7 +453,13 @@ def main(channel, nick, oauth_token_path, base_dir='/mnt', name=None):
if archiver.got_reconnect.is_set():
logging.info("Got RECONNECT, creating new client while waiting for old one to finish")
continue
# the only remaining case is that the worker failed, re-raise
# the only remaining case is that something failed. stop everything and re-raise.
logging.warning("Stopping due to worker dying")
stopping.set()
worker.join()
merger.join()
# at least one of these two should raise
worker.get()
assert False, "Not stopping, but worker exited successfully"
merger.get()
assert False, "Worker unexpectedly exited successfully"
logging.info("Gracefully stopped")

@ -10,5 +10,6 @@ setup(
install_requires=[
'argh',
'gevent',
'monotonic',
],
)

Loading…
Cancel
Save