From 96cc212bf0f4949cea81bb8fd9324e3b8bb8fef7 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 28 Jul 2022 18:02:47 +1000 Subject: [PATCH] chat_archiver: fixes, implement merge_all --- chat_archiver/chat_archiver/main.py | 58 ++++++++++++++++++------ chat_archiver/chat_archiver/merge_all.py | 14 ++++++ 2 files changed, 57 insertions(+), 15 deletions(-) create mode 100644 chat_archiver/chat_archiver/merge_all.py diff --git a/chat_archiver/chat_archiver/main.py b/chat_archiver/chat_archiver/main.py index d112894..a40f81c 100644 --- a/chat_archiver/chat_archiver/main.py +++ b/chat_archiver/chat_archiver/main.py @@ -1,4 +1,5 @@ +from calendar import timegm import base64 import hashlib import json @@ -178,12 +179,16 @@ def write_batch(path, batch_time, messages): time = datetime.utcfromtimestamp(batch_time).strftime("%Y-%m-%dT%H:%M:%S") filename = "{}-{}.json".format(time, hash) filepath = os.path.join(path, filename) - temppath = "{}.{}.temp".format(filepath, uuid4()) - ensure_directory(filepath) - with open(temppath, 'wb') as f: - f.write(output) - os.rename(temppath, filepath) - logging.info("Wrote batch {}".format(filepath)) + if os.path.exists(filepath): + logging.info("Not writing batch {} - already exists.".format(filename)) + else: + temppath = "{}.{}.temp".format(filepath, uuid4()) + ensure_directory(filepath) + with open(temppath, 'wb') as f: + f.write(output) + os.rename(temppath, filepath) + logging.info("Wrote batch {}".format(filepath)) + return filepath def format_batch(messages): @@ -210,6 +215,25 @@ def get_batch_files(path, batch_time): ] +def merge_all(path): + """Repeatedly scans the batch directory for batch files with the same batch time, and merges them. + Returns once it finds no duplicate files.""" + while True: + by_time = {} + for name in os.listdir(path): + if not name.endswith(".json"): + continue + timestamp = "-".join(name.split("-")[:3]) + by_time[timestamp] = by_time.get(timestamp, 0) + 1 + if not any(count > 1 for timestamp, count in by_time.items()): + break + for timestamp, count in by_time.items(): + if count > 1: + logging.info("Merging {} batches at time {}".format(count, timestamp)) + batch_time = timegm(time.strptime(timestamp, "%Y-%m-%dT%H:%M:%S")) + merge_batch_files(path, batch_time) + + def merge_batch_files(path, batch_time): """For the given batch time, merges all the following messages: - From batch files at that time @@ -241,15 +265,19 @@ def merge_batch_files(path, batch_time): batch = [json.loads(line) for line in batch.strip().split("\n")] messages = merge_messages(messages, batch) - # sorting by time is needed for group_by(), we'll sort properly on save. - messages.sort(key=lambda message: message['time']) - for batch_time, batch in itertools.group_by(messages, key= - lambda message: int(message['time'] / BATCH_INTERVAL) * BATCH_INTERVAL - ): - write_batch(path, batch_time, batch) + by_time = {} + for message in messages: + batch_time = int(message['time'] / BATCH_INTERVAL) * BATCH_INTERVAL + by_time.setdefault(batch_time, []).append(message) + + written = [] + for batch_time, batch in by_time.items(): + written.append(write_batch(path, batch_time, batch)) for batch_file in batch_files: - os.remove(batch_file) + # don't delete something we just (re-)wrote + if batch_file not in written: + os.remove(batch_file) def merge_messages(left, right): """Merges two lists of messages into one merged list. @@ -289,7 +317,7 @@ def merge_messages(left, right): # by the same recipient. for k in receivers.keys(): for old in (a, b): - if k in old and old[k] != recievers[k]: + if k in old and old[k] != receivers[k]: raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}") return a | { "time": o[0], @@ -303,7 +331,7 @@ def merge_messages(left, right): unmatched = [], [] for messages, u in zip((left, right), unmatched): for message in messages: - id = message.get('tags', {}).get('id') + id = (message.get('tags') or {}).get('id') if id: by_id.setdefault(id, []).append(message) else: diff --git a/chat_archiver/chat_archiver/merge_all.py b/chat_archiver/chat_archiver/merge_all.py new file mode 100644 index 0000000..a9f0e65 --- /dev/null +++ b/chat_archiver/chat_archiver/merge_all.py @@ -0,0 +1,14 @@ + +import argh +import logging +import json + +from .main import merge_messages, format_batch, merge_all + +def main(path, log='INFO'): + """Merge all batch files with the same timestamp within given directory""" + logging.basicConfig(level=log) + merge_all(path) + +if __name__ == '__main__': + argh.dispatch_command(main)