From 335e29e1bcc72040e6d48923f18de61026e0a2b4 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 5 Nov 2024 08:11:44 +0100 Subject: [PATCH 1/7] wip: --- merge_v1.py | 96 ++++++++++++++++++++++++++++++++++++++++++++++++++++ merge_v2.py | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 193 insertions(+) create mode 100644 merge_v1.py create mode 100644 merge_v2.py diff --git a/merge_v1.py b/merge_v1.py new file mode 100644 index 0000000..164ee65 --- /dev/null +++ b/merge_v1.py @@ -0,0 +1,96 @@ +import json +import time +import hashlib + +# Calculates intersection of time range of both messages, or None if they don't overlap +def overlap(a, b): + range_start = max(a['time'], b['time']) + range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range']) + if range_end < range_start: + return None + return range_start, range_end - range_start + +# Returns merged message if two messages are compatible with being the same message, +# or else None. +def merge_message(a, b): + o = overlap(a, b) + if o and all( + a.get(k) == b.get(k) + for k in set(a.keys()) | set(b.keys()) + if k not in ("receivers", "time", "time_range") + ): + receivers = a["receivers"] | b["receivers"] + # Error checking - make sure no receiver timestamps are being overwritten. + # This would indicate we're merging two messages recieved at different times + # by the same recipient. + for k in receivers.keys(): + for old in (a, b): + if k in old and old[k] != receivers[k]: + raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}") + return a | { + "time": o[0], + "time_range": o[1], + "receivers": receivers, + } + return None + + +def merge_messages(left, right): + # An optimization - if either size is empty, return the other side without processing. + if not left: + return right + if not right: + return left + + # Match things with identical ids first, and collect unmatched into left and right lists + by_id = {} + unmatched = [], [] + for messages, u in zip((left, right), unmatched): + for message in messages: + id = (message.get('tags') or {}).get('id') + if id: + by_id.setdefault(id, []).append(message) + else: + u.append(message) + + result = [] + for id, messages in by_id.items(): + if len(messages) == 1: + result.append(messages[0]) + else: + merged = merge_message(*messages) + if merged is None: + raise ValueError(f"Got two non-matching messages with id {id}: {messages[0]}, {messages[1]}") + result.append(merged) + + # For time-range messages, pair off each one in left with first match in right, + # and pass through anything with no matches. + left_unmatched, right_unmatched = unmatched + for message in left_unmatched: + for other in right_unmatched: + merged = merge_message(message, other) + if merged: + right_unmatched.remove(other) + result.append(merged) + break + else: + result.append(message) + for message in right_unmatched: + result.append(message) + + return result + + +def main(*files): + batches = [json.load(open(file)) for file in files] + result = batches[0] + start = time.monotonic() + for batch in batches[1:]: + result = merge_messages(result, batch) + interval = time.monotonic() - start + hash = hashlib.sha256(json.dumps(result).encode()).hexdigest() + print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}") + +if __name__ == '__main__': + import sys + main(*sys.argv[1:]) diff --git a/merge_v2.py b/merge_v2.py new file mode 100644 index 0000000..7752643 --- /dev/null +++ b/merge_v2.py @@ -0,0 +1,97 @@ +import json +import time +import hashlib + +# Calculates intersection of time range of both messages, or None if they don't overlap +def overlap(a, b): + range_start = max(a['time'], b['time']) + range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range']) + if range_end < range_start: + return None + return range_start, range_end - range_start + +# Returns merged message if two messages are compatible with being the same message, +# or else None. +def merge_message(a, b): + o = overlap(a, b) + if o and all( + a.get(k) == b.get(k) + for k in set(a.keys()) | set(b.keys()) + if k not in ("receivers", "time", "time_range") + ): + receivers = a["receivers"] | b["receivers"] + # Error checking - make sure no receiver timestamps are being overwritten. + # This would indicate we're merging two messages recieved at different times + # by the same recipient. + for k in receivers.keys(): + for old in (a, b): + if k in old and old[k] != receivers[k]: + raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}") + return a | { + "time": o[0], + "time_range": o[1], + "receivers": receivers, + } + return None + +def merge_messages(left, right): + if not left: + return right + if not right: + return left + + result = [] + while left and right: + # Find earliest message out of left and right. + # The other side becomes the candidate messages. + if left[0]['time'] <= right[0]['time']: + message, candidates = left.pop(0), right + else: + message, candidates = right.pop(0), left + + # Scan candidates for matching message until found or we know there is no match + id = message.get("tags", {}).get("id") + end = message['time'] + message['time_range'] + merged = None + for index, candidate in enumerate(candidates): + # optimization: stop when earliest candidate is after message's end time + if end < candidate['time']: + break + if id is None: + merged = merge_message(message, candidate) + if merged is not None: + candidates.pop(index) + break + elif candidate.get("tags", {}).get("id") == id: + merged = merge_message(message, candidate) + if merged is None: + raise ValueError("TODO") + candidates.pop(index) + break + + # If unmatched, just keep original + if merged is None: + merged = message + + result.append(message) + + # Once one side is exhausted, the other side must have all remaining messages unmatched. + # So just append everything. + result += left + right + + return result + + +def main(*files): + batches = [json.load(open(file)) for file in files] + result = batches[0] + start = time.monotonic() + for batch in batches[1:]: + result = merge_messages(result, batch) + interval = time.monotonic() - start + hash = hashlib.sha256(json.dumps(result).encode()).hexdigest() + print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}") + +if __name__ == '__main__': + import sys + main(*sys.argv[1:]) From 3ad24f5aece52983f25d7e0dcf92cde902f79ed9 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Sat, 9 Nov 2024 02:41:15 +0800 Subject: [PATCH 2/7] get working --- merge_v1.py | 7 ++++++- merge_v2.py | 11 ++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/merge_v1.py b/merge_v1.py index 164ee65..af6acd7 100644 --- a/merge_v1.py +++ b/merge_v1.py @@ -81,8 +81,13 @@ def merge_messages(left, right): return result +def load(file): + with open(file) as f: + return [json.loads(line) for line in f.read().strip().split("\n")] + + def main(*files): - batches = [json.load(open(file)) for file in files] + batches = [load(file) for file in files] result = batches[0] start = time.monotonic() for batch in batches[1:]: diff --git a/merge_v2.py b/merge_v2.py index 7752643..a2d0e69 100644 --- a/merge_v2.py +++ b/merge_v2.py @@ -50,7 +50,7 @@ def merge_messages(left, right): message, candidates = right.pop(0), left # Scan candidates for matching message until found or we know there is no match - id = message.get("tags", {}).get("id") + id = (message.get("tags") or {}).get("id") end = message['time'] + message['time_range'] merged = None for index, candidate in enumerate(candidates): @@ -62,7 +62,7 @@ def merge_messages(left, right): if merged is not None: candidates.pop(index) break - elif candidate.get("tags", {}).get("id") == id: + elif (candidate.get("tags") or {}).get("id") == id: merged = merge_message(message, candidate) if merged is None: raise ValueError("TODO") @@ -82,8 +82,13 @@ def merge_messages(left, right): return result +def load(file): + with open(file) as f: + return [json.loads(line) for line in f.read().strip().split("\n")] + + def main(*files): - batches = [json.load(open(file)) for file in files] + batches = [load(file) for file in files] result = batches[0] start = time.monotonic() for batch in batches[1:]: From 91e3e54a14c63964de733e6594c39e71ae553244 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 20 Mar 2025 04:04:21 +1100 Subject: [PATCH 4/7] add ability to dump output to debug --- merge_v1.py | 15 +++++++++++++-- merge_v2.py | 11 +++++++++-- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/merge_v1.py b/merge_v1.py index af6acd7..fbc757a 100644 --- a/merge_v1.py +++ b/merge_v1.py @@ -87,14 +87,25 @@ def load(file): def main(*files): + out = False + if files and files[0] == "--out": + files = files[1:] + out = True batches = [load(file) for file in files] result = batches[0] start = time.monotonic() for batch in batches[1:]: result = merge_messages(result, batch) interval = time.monotonic() - start - hash = hashlib.sha256(json.dumps(result).encode()).hexdigest() - print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}") + # v1 version must be corrected to have messages in time order, which v2 does by default + # but causes problems when comparing. + # We don't count this against the execution time. + result.sort(key=lambda o: o["time"]) + if out: + print(json.dumps(result)) + else: + hash = hashlib.sha256(json.dumps(result).encode()).hexdigest() + print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}") if __name__ == '__main__': import sys diff --git a/merge_v2.py b/merge_v2.py index a2d0e69..2751d92 100644 --- a/merge_v2.py +++ b/merge_v2.py @@ -88,14 +88,21 @@ def load(file): def main(*files): + out = False + if files and files[0] == "--out": + files = files[1:] + out = True batches = [load(file) for file in files] result = batches[0] start = time.monotonic() for batch in batches[1:]: result = merge_messages(result, batch) interval = time.monotonic() - start - hash = hashlib.sha256(json.dumps(result).encode()).hexdigest() - print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}") + if out: + print(json.dumps(result)) + else: + hash = hashlib.sha256(json.dumps(result).encode()).hexdigest() + print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}") if __name__ == '__main__': import sys From eeeb48c0f85f761ab49a047a9a7adbab32111a63 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 20 Mar 2025 04:27:36 +1100 Subject: [PATCH 5/7] add third implementation which is really just the second implementation but generalized to N-way merge instead of 2-way. Basic benchmark results: v1 v2 v3 1 hour (60 batches): 19ms 31ms 23ms 24 hours (1440 batches): 34s 130s 17s So it's about as fast as v1 when it doesn't matter, and twice as fast when it does. --- merge_v3.py | 104 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 merge_v3.py diff --git a/merge_v3.py b/merge_v3.py new file mode 100644 index 0000000..5bf6367 --- /dev/null +++ b/merge_v3.py @@ -0,0 +1,104 @@ +import json +import time +import hashlib + +# Calculates intersection of time range of both messages, or None if they don't overlap +def overlap(a, b): + range_start = max(a['time'], b['time']) + range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range']) + if range_end < range_start: + return None + return range_start, range_end - range_start + +# Returns merged message if two messages are compatible with being the same message, +# or else None. +def merge_message(a, b): + o = overlap(a, b) + if o and all( + a.get(k) == b.get(k) + for k in set(a.keys()) | set(b.keys()) + if k not in ("receivers", "time", "time_range") + ): + receivers = a["receivers"] | b["receivers"] + # Error checking - make sure no receiver timestamps are being overwritten. + # This would indicate we're merging two messages recieved at different times + # by the same recipient. + for k in receivers.keys(): + for old in (a, b): + if k in old and old[k] != receivers[k]: + raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}") + return a | { + "time": o[0], + "time_range": o[1], + "receivers": receivers, + } + return None + +def merge_messages(*batches): + batches = [b for b in batches if len(b) > 0] + + result = [] + while len(batches) > 1: + # Find batch with the earliest message + first_batch = min(batches, key=lambda b: b[0]["time"]) + message = first_batch.pop(0) + id = (message.get("tags") or {}).get("id") + + # For each other batch, attempt to find a matching message + for batch in batches: + if batch is first_batch: + continue + end = message["time"] + message["time_range"] + merged = None + for index, candidate in enumerate(batch): + # optimization: stop when earliest candidate is after message's end time + if end < candidate['time']: + break + if id is None: + merged = merge_message(message, candidate) + if merged is not None: + candidates.pop(index) + break + elif (candidate.get("tags") or {}).get("id") == id: + merged = merge_message(message, candidate) + if merged is None: + raise ValueError("TODO") + candidates.pop(index) + break + if merged is not None: + message = merged + + result.append(message) + batches = [b for b in batches if len(b) > 0] + + # Once all but one batch is exhausted, the last one must have all remaining messages unmatched. + # So just append everything. + if batches: + result += batches[0] + + return result + + +def load(file): + with open(file) as f: + return [json.loads(line) for line in f.read().strip().split("\n")] + + +def main(*files): + out = False + if files and files[0] == "--out": + files = files[1:] + out = True + batches = [load(file) for file in files] + start = time.monotonic() + result = merge_messages(*batches) + interval = time.monotonic() - start + if out: + print(json.dumps(result)) + else: + hash = hashlib.sha256(json.dumps(result).encode()).hexdigest() + print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}") + +if __name__ == '__main__': + import sys + main(*sys.argv[1:]) From c87a25e875a56443c4d0334f3b0f08e90caf7f4f Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 20 Mar 2025 04:49:06 +1100 Subject: [PATCH 6/7] merge_v4 is more specialized to merge large numbers of non-overlapping batches It can do 24h of batches in 180ms, ~180x faster than v1. I'm a little concerned it's overly specialized for that role and will be slow to merge duplicate batches, but those operations involve tiny amounts of data by comparison so it doesn't really matter. --- merge_v4.py | 132 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 merge_v4.py diff --git a/merge_v4.py b/merge_v4.py new file mode 100644 index 0000000..20df1ab --- /dev/null +++ b/merge_v4.py @@ -0,0 +1,132 @@ +import json +import time +import hashlib + +# Calculates intersection of time range of both messages, or None if they don't overlap +def overlap(a, b): + range_start = max(a['time'], b['time']) + range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range']) + if range_end < range_start: + return None + return range_start, range_end - range_start + +# Returns merged message if two messages are compatible with being the same message, +# or else None. +def merge_message(a, b): + o = overlap(a, b) + if o and all( + a.get(k) == b.get(k) + for k in set(a.keys()) | set(b.keys()) + if k not in ("receivers", "time", "time_range") + ): + receivers = a["receivers"] | b["receivers"] + # Error checking - make sure no receiver timestamps are being overwritten. + # This would indicate we're merging two messages recieved at different times + # by the same recipient. + for k in receivers.keys(): + for old in (a, b): + if k in old and old[k] != receivers[k]: + raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}") + return a | { + "time": o[0], + "time_range": o[1], + "receivers": receivers, + } + return None + +def merge_messages(*batches): + batches = [b for b in batches if len(b) > 0] + + result = [] + while len(batches) > 1: + # Shortcut for non-overlapping time ranges - if one batch has messages that end before + # the start of any other batch, take those messages directly without trying to match them. + + # Find batches with the earliest and second earliest messages + first = batches[0] + second = batches[1] + if first[0]["time"] > second[0]["time"]: + first, second = second, first + for batch in batches[2:]: + if batch[0]["time"] < first[0]["time"]: + first, second = batch, first + elif batch[0]["time"] < second[0]["time"]: + second = batch + + # Find messages in first batch that end before the start of second batch, + # and copy them directly to result. + cutoff = second[0]["time"] + while first and first[0]["time"] + first[0]["time_range"] < cutoff: + result.append(first[0]) + first.pop(0) + + # If first now overlaps with second, move on to try to find messages to merge. + # If no overlap (either first is exhausted, or second now starts sooner than first) + # then just start again from the top and look for more non-overapping ranges. + if not first: + batches.remove(first) + continue + if cutoff < first[0]["time"]: + continue + + message = first.pop(0) + id = (message.get("tags") or {}).get("id") + + # For each other batch, attempt to find a matching message + for batch in batches: + if batch is first: + continue + end = message["time"] + message["time_range"] + merged = None + for index, candidate in enumerate(batch): + # optimization: stop when earliest candidate is after message's end time + if end < candidate['time']: + break + if id is None: + merged = merge_message(message, candidate) + if merged is not None: + candidates.pop(index) + break + elif (candidate.get("tags") or {}).get("id") == id: + merged = merge_message(message, candidate) + if merged is None: + raise ValueError("TODO") + candidates.pop(index) + break + if merged is not None: + message = merged + + result.append(message) + batches = [b for b in batches if len(b) > 0] + + # Once all but one batch is exhausted, the last one must have all remaining messages unmatched. + # So just append everything. + if batches: + result += batches[0] + + return result + + +def load(file): + with open(file) as f: + return [json.loads(line) for line in f.read().strip().split("\n")] + + +def main(*files): + out = False + if files and files[0] == "--out": + files = files[1:] + out = True + batches = [load(file) for file in files] + start = time.monotonic() + result = merge_messages(*batches) + interval = time.monotonic() - start + if out: + print(json.dumps(result)) + else: + hash = hashlib.sha256(json.dumps(result).encode()).hexdigest() + print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}") + +if __name__ == '__main__': + import sys + main(*sys.argv[1:]) From 50c233b58c68b9efd5242cc2b0d72ae5ab16e798 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 20 Mar 2025 04:53:52 +1100 Subject: [PATCH 7/7] fix copy paste error in v3/v4 and benchmark duplicate merges Merging 1000 duplicates of the same batch: v1: 98ms v2: 101ms v3: 116ms v4: 116ms So as expected v3 and v4 have a small cost with duplicates for optimizing for non-overlapping, but even with ridiculous numbers duplicates are fast to process anyway. --- merge_v3.py | 4 ++-- merge_v4.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/merge_v3.py b/merge_v3.py index 5bf6367..3cc3137 100644 --- a/merge_v3.py +++ b/merge_v3.py @@ -57,13 +57,13 @@ def merge_messages(*batches): if id is None: merged = merge_message(message, candidate) if merged is not None: - candidates.pop(index) + batch.pop(index) break elif (candidate.get("tags") or {}).get("id") == id: merged = merge_message(message, candidate) if merged is None: raise ValueError("TODO") - candidates.pop(index) + batch.pop(index) break if merged is not None: message = merged diff --git a/merge_v4.py b/merge_v4.py index 20df1ab..6983f98 100644 --- a/merge_v4.py +++ b/merge_v4.py @@ -85,13 +85,13 @@ def merge_messages(*batches): if id is None: merged = merge_message(message, candidate) if merged is not None: - candidates.pop(index) + batch.pop(index) break elif (candidate.get("tags") or {}).get("id") == id: merged = merge_message(message, candidate) if merged is None: raise ValueError("TODO") - candidates.pop(index) + batch.pop(index) break if merged is not None: message = merged