diff --git a/merge_v1.py b/merge_v1.py new file mode 100644 index 0000000..fbc757a --- /dev/null +++ b/merge_v1.py @@ -0,0 +1,112 @@ +import json +import time +import hashlib + +# Calculates intersection of time range of both messages, or None if they don't overlap +def overlap(a, b): + range_start = max(a['time'], b['time']) + range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range']) + if range_end < range_start: + return None + return range_start, range_end - range_start + +# Returns merged message if two messages are compatible with being the same message, +# or else None. +def merge_message(a, b): + o = overlap(a, b) + if o and all( + a.get(k) == b.get(k) + for k in set(a.keys()) | set(b.keys()) + if k not in ("receivers", "time", "time_range") + ): + receivers = a["receivers"] | b["receivers"] + # Error checking - make sure no receiver timestamps are being overwritten. + # This would indicate we're merging two messages recieved at different times + # by the same recipient. + for k in receivers.keys(): + for old in (a, b): + if k in old and old[k] != receivers[k]: + raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}") + return a | { + "time": o[0], + "time_range": o[1], + "receivers": receivers, + } + return None + + +def merge_messages(left, right): + # An optimization - if either size is empty, return the other side without processing. + if not left: + return right + if not right: + return left + + # Match things with identical ids first, and collect unmatched into left and right lists + by_id = {} + unmatched = [], [] + for messages, u in zip((left, right), unmatched): + for message in messages: + id = (message.get('tags') or {}).get('id') + if id: + by_id.setdefault(id, []).append(message) + else: + u.append(message) + + result = [] + for id, messages in by_id.items(): + if len(messages) == 1: + result.append(messages[0]) + else: + merged = merge_message(*messages) + if merged is None: + raise ValueError(f"Got two non-matching messages with id {id}: {messages[0]}, {messages[1]}") + result.append(merged) + + # For time-range messages, pair off each one in left with first match in right, + # and pass through anything with no matches. + left_unmatched, right_unmatched = unmatched + for message in left_unmatched: + for other in right_unmatched: + merged = merge_message(message, other) + if merged: + right_unmatched.remove(other) + result.append(merged) + break + else: + result.append(message) + for message in right_unmatched: + result.append(message) + + return result + + +def load(file): + with open(file) as f: + return [json.loads(line) for line in f.read().strip().split("\n")] + + +def main(*files): + out = False + if files and files[0] == "--out": + files = files[1:] + out = True + batches = [load(file) for file in files] + result = batches[0] + start = time.monotonic() + for batch in batches[1:]: + result = merge_messages(result, batch) + interval = time.monotonic() - start + # v1 version must be corrected to have messages in time order, which v2 does by default + # but causes problems when comparing. + # We don't count this against the execution time. + result.sort(key=lambda o: o["time"]) + if out: + print(json.dumps(result)) + else: + hash = hashlib.sha256(json.dumps(result).encode()).hexdigest() + print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}") + +if __name__ == '__main__': + import sys + main(*sys.argv[1:]) diff --git a/merge_v2.py b/merge_v2.py new file mode 100644 index 0000000..2751d92 --- /dev/null +++ b/merge_v2.py @@ -0,0 +1,109 @@ +import json +import time +import hashlib + +# Calculates intersection of time range of both messages, or None if they don't overlap +def overlap(a, b): + range_start = max(a['time'], b['time']) + range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range']) + if range_end < range_start: + return None + return range_start, range_end - range_start + +# Returns merged message if two messages are compatible with being the same message, +# or else None. +def merge_message(a, b): + o = overlap(a, b) + if o and all( + a.get(k) == b.get(k) + for k in set(a.keys()) | set(b.keys()) + if k not in ("receivers", "time", "time_range") + ): + receivers = a["receivers"] | b["receivers"] + # Error checking - make sure no receiver timestamps are being overwritten. + # This would indicate we're merging two messages recieved at different times + # by the same recipient. + for k in receivers.keys(): + for old in (a, b): + if k in old and old[k] != receivers[k]: + raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}") + return a | { + "time": o[0], + "time_range": o[1], + "receivers": receivers, + } + return None + +def merge_messages(left, right): + if not left: + return right + if not right: + return left + + result = [] + while left and right: + # Find earliest message out of left and right. + # The other side becomes the candidate messages. + if left[0]['time'] <= right[0]['time']: + message, candidates = left.pop(0), right + else: + message, candidates = right.pop(0), left + + # Scan candidates for matching message until found or we know there is no match + id = (message.get("tags") or {}).get("id") + end = message['time'] + message['time_range'] + merged = None + for index, candidate in enumerate(candidates): + # optimization: stop when earliest candidate is after message's end time + if end < candidate['time']: + break + if id is None: + merged = merge_message(message, candidate) + if merged is not None: + candidates.pop(index) + break + elif (candidate.get("tags") or {}).get("id") == id: + merged = merge_message(message, candidate) + if merged is None: + raise ValueError("TODO") + candidates.pop(index) + break + + # If unmatched, just keep original + if merged is None: + merged = message + + result.append(message) + + # Once one side is exhausted, the other side must have all remaining messages unmatched. + # So just append everything. + result += left + right + + return result + + +def load(file): + with open(file) as f: + return [json.loads(line) for line in f.read().strip().split("\n")] + + +def main(*files): + out = False + if files and files[0] == "--out": + files = files[1:] + out = True + batches = [load(file) for file in files] + result = batches[0] + start = time.monotonic() + for batch in batches[1:]: + result = merge_messages(result, batch) + interval = time.monotonic() - start + if out: + print(json.dumps(result)) + else: + hash = hashlib.sha256(json.dumps(result).encode()).hexdigest() + print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}") + +if __name__ == '__main__': + import sys + main(*sys.argv[1:]) diff --git a/merge_v3.py b/merge_v3.py new file mode 100644 index 0000000..3cc3137 --- /dev/null +++ b/merge_v3.py @@ -0,0 +1,104 @@ +import json +import time +import hashlib + +# Calculates intersection of time range of both messages, or None if they don't overlap +def overlap(a, b): + range_start = max(a['time'], b['time']) + range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range']) + if range_end < range_start: + return None + return range_start, range_end - range_start + +# Returns merged message if two messages are compatible with being the same message, +# or else None. +def merge_message(a, b): + o = overlap(a, b) + if o and all( + a.get(k) == b.get(k) + for k in set(a.keys()) | set(b.keys()) + if k not in ("receivers", "time", "time_range") + ): + receivers = a["receivers"] | b["receivers"] + # Error checking - make sure no receiver timestamps are being overwritten. + # This would indicate we're merging two messages recieved at different times + # by the same recipient. + for k in receivers.keys(): + for old in (a, b): + if k in old and old[k] != receivers[k]: + raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}") + return a | { + "time": o[0], + "time_range": o[1], + "receivers": receivers, + } + return None + +def merge_messages(*batches): + batches = [b for b in batches if len(b) > 0] + + result = [] + while len(batches) > 1: + # Find batch with the earliest message + first_batch = min(batches, key=lambda b: b[0]["time"]) + message = first_batch.pop(0) + id = (message.get("tags") or {}).get("id") + + # For each other batch, attempt to find a matching message + for batch in batches: + if batch is first_batch: + continue + end = message["time"] + message["time_range"] + merged = None + for index, candidate in enumerate(batch): + # optimization: stop when earliest candidate is after message's end time + if end < candidate['time']: + break + if id is None: + merged = merge_message(message, candidate) + if merged is not None: + batch.pop(index) + break + elif (candidate.get("tags") or {}).get("id") == id: + merged = merge_message(message, candidate) + if merged is None: + raise ValueError("TODO") + batch.pop(index) + break + if merged is not None: + message = merged + + result.append(message) + batches = [b for b in batches if len(b) > 0] + + # Once all but one batch is exhausted, the last one must have all remaining messages unmatched. + # So just append everything. + if batches: + result += batches[0] + + return result + + +def load(file): + with open(file) as f: + return [json.loads(line) for line in f.read().strip().split("\n")] + + +def main(*files): + out = False + if files and files[0] == "--out": + files = files[1:] + out = True + batches = [load(file) for file in files] + start = time.monotonic() + result = merge_messages(*batches) + interval = time.monotonic() - start + if out: + print(json.dumps(result)) + else: + hash = hashlib.sha256(json.dumps(result).encode()).hexdigest() + print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}") + +if __name__ == '__main__': + import sys + main(*sys.argv[1:]) diff --git a/merge_v4.py b/merge_v4.py new file mode 100644 index 0000000..6983f98 --- /dev/null +++ b/merge_v4.py @@ -0,0 +1,132 @@ +import json +import time +import hashlib + +# Calculates intersection of time range of both messages, or None if they don't overlap +def overlap(a, b): + range_start = max(a['time'], b['time']) + range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range']) + if range_end < range_start: + return None + return range_start, range_end - range_start + +# Returns merged message if two messages are compatible with being the same message, +# or else None. +def merge_message(a, b): + o = overlap(a, b) + if o and all( + a.get(k) == b.get(k) + for k in set(a.keys()) | set(b.keys()) + if k not in ("receivers", "time", "time_range") + ): + receivers = a["receivers"] | b["receivers"] + # Error checking - make sure no receiver timestamps are being overwritten. + # This would indicate we're merging two messages recieved at different times + # by the same recipient. + for k in receivers.keys(): + for old in (a, b): + if k in old and old[k] != receivers[k]: + raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}") + return a | { + "time": o[0], + "time_range": o[1], + "receivers": receivers, + } + return None + +def merge_messages(*batches): + batches = [b for b in batches if len(b) > 0] + + result = [] + while len(batches) > 1: + # Shortcut for non-overlapping time ranges - if one batch has messages that end before + # the start of any other batch, take those messages directly without trying to match them. + + # Find batches with the earliest and second earliest messages + first = batches[0] + second = batches[1] + if first[0]["time"] > second[0]["time"]: + first, second = second, first + for batch in batches[2:]: + if batch[0]["time"] < first[0]["time"]: + first, second = batch, first + elif batch[0]["time"] < second[0]["time"]: + second = batch + + # Find messages in first batch that end before the start of second batch, + # and copy them directly to result. + cutoff = second[0]["time"] + while first and first[0]["time"] + first[0]["time_range"] < cutoff: + result.append(first[0]) + first.pop(0) + + # If first now overlaps with second, move on to try to find messages to merge. + # If no overlap (either first is exhausted, or second now starts sooner than first) + # then just start again from the top and look for more non-overapping ranges. + if not first: + batches.remove(first) + continue + if cutoff < first[0]["time"]: + continue + + message = first.pop(0) + id = (message.get("tags") or {}).get("id") + + # For each other batch, attempt to find a matching message + for batch in batches: + if batch is first: + continue + end = message["time"] + message["time_range"] + merged = None + for index, candidate in enumerate(batch): + # optimization: stop when earliest candidate is after message's end time + if end < candidate['time']: + break + if id is None: + merged = merge_message(message, candidate) + if merged is not None: + batch.pop(index) + break + elif (candidate.get("tags") or {}).get("id") == id: + merged = merge_message(message, candidate) + if merged is None: + raise ValueError("TODO") + batch.pop(index) + break + if merged is not None: + message = merged + + result.append(message) + batches = [b for b in batches if len(b) > 0] + + # Once all but one batch is exhausted, the last one must have all remaining messages unmatched. + # So just append everything. + if batches: + result += batches[0] + + return result + + +def load(file): + with open(file) as f: + return [json.loads(line) for line in f.read().strip().split("\n")] + + +def main(*files): + out = False + if files and files[0] == "--out": + files = files[1:] + out = True + batches = [load(file) for file in files] + start = time.monotonic() + result = merge_messages(*batches) + interval = time.monotonic() - start + if out: + print(json.dumps(result)) + else: + hash = hashlib.sha256(json.dumps(result).encode()).hexdigest() + print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}") + +if __name__ == '__main__': + import sys + main(*sys.argv[1:])