Compare commits

...

4 Commits

Author SHA1 Message Date
Mike Lang 50c233b58c fix copy paste error in v3/v4 and benchmark duplicate merges
Merging 1000 duplicates of the same batch:
v1: 98ms
v2: 101ms
v3: 116ms
v4: 116ms

So as expected v3 and v4 have a small cost with duplicates for optimizing for non-overlapping,
but even with ridiculous numbers duplicates are fast to process anyway.
5 months ago
Mike Lang c87a25e875 merge_v4 is more specialized to merge large numbers of non-overlapping batches
It can do 24h of batches in 180ms, ~180x faster than v1.

I'm a little concerned it's overly specialized for that role and will be slow to merge duplicate batches,
but those operations involve tiny amounts of data by comparison so it doesn't really matter.
5 months ago
Mike Lang eeeb48c0f8 add third implementation
which is really just the second implementation but generalized to N-way merge instead of 2-way.

Basic benchmark results:
                         v1   v2   v3
    1 hour (60 batches): 19ms 31ms 23ms
24 hours (1440 batches): 34s  130s 17s

So it's about as fast as v1 when it doesn't matter, and twice as fast when it does.
5 months ago
Mike Lang 91e3e54a14 add ability to dump output to debug 5 months ago

@ -87,14 +87,25 @@ def load(file):
def main(*files):
out = False
if files and files[0] == "--out":
files = files[1:]
out = True
batches = [load(file) for file in files]
result = batches[0]
start = time.monotonic()
for batch in batches[1:]:
result = merge_messages(result, batch)
interval = time.monotonic() - start
hash = hashlib.sha256(json.dumps(result).encode()).hexdigest()
print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}")
# v1 version must be corrected to have messages in time order, which v2 does by default
# but causes problems when comparing.
# We don't count this against the execution time.
result.sort(key=lambda o: o["time"])
if out:
print(json.dumps(result))
else:
hash = hashlib.sha256(json.dumps(result).encode()).hexdigest()
print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}")
if __name__ == '__main__':
import sys

@ -88,14 +88,21 @@ def load(file):
def main(*files):
out = False
if files and files[0] == "--out":
files = files[1:]
out = True
batches = [load(file) for file in files]
result = batches[0]
start = time.monotonic()
for batch in batches[1:]:
result = merge_messages(result, batch)
interval = time.monotonic() - start
hash = hashlib.sha256(json.dumps(result).encode()).hexdigest()
print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}")
if out:
print(json.dumps(result))
else:
hash = hashlib.sha256(json.dumps(result).encode()).hexdigest()
print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}")
if __name__ == '__main__':
import sys

@ -0,0 +1,104 @@
import json
import time
import hashlib
# Calculates intersection of time range of both messages, or None if they don't overlap
def overlap(a, b):
range_start = max(a['time'], b['time'])
range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range'])
if range_end < range_start:
return None
return range_start, range_end - range_start
# Returns merged message if two messages are compatible with being the same message,
# or else None.
def merge_message(a, b):
o = overlap(a, b)
if o and all(
a.get(k) == b.get(k)
for k in set(a.keys()) | set(b.keys())
if k not in ("receivers", "time", "time_range")
):
receivers = a["receivers"] | b["receivers"]
# Error checking - make sure no receiver timestamps are being overwritten.
# This would indicate we're merging two messages recieved at different times
# by the same recipient.
for k in receivers.keys():
for old in (a, b):
if k in old and old[k] != receivers[k]:
raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}")
return a | {
"time": o[0],
"time_range": o[1],
"receivers": receivers,
}
return None
def merge_messages(*batches):
batches = [b for b in batches if len(b) > 0]
result = []
while len(batches) > 1:
# Find batch with the earliest message
first_batch = min(batches, key=lambda b: b[0]["time"])
message = first_batch.pop(0)
id = (message.get("tags") or {}).get("id")
# For each other batch, attempt to find a matching message
for batch in batches:
if batch is first_batch:
continue
end = message["time"] + message["time_range"]
merged = None
for index, candidate in enumerate(batch):
# optimization: stop when earliest candidate is after message's end time
if end < candidate['time']:
break
if id is None:
merged = merge_message(message, candidate)
if merged is not None:
batch.pop(index)
break
elif (candidate.get("tags") or {}).get("id") == id:
merged = merge_message(message, candidate)
if merged is None:
raise ValueError("TODO")
batch.pop(index)
break
if merged is not None:
message = merged
result.append(message)
batches = [b for b in batches if len(b) > 0]
# Once all but one batch is exhausted, the last one must have all remaining messages unmatched.
# So just append everything.
if batches:
result += batches[0]
return result
def load(file):
with open(file) as f:
return [json.loads(line) for line in f.read().strip().split("\n")]
def main(*files):
out = False
if files and files[0] == "--out":
files = files[1:]
out = True
batches = [load(file) for file in files]
start = time.monotonic()
result = merge_messages(*batches)
interval = time.monotonic() - start
if out:
print(json.dumps(result))
else:
hash = hashlib.sha256(json.dumps(result).encode()).hexdigest()
print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}")
if __name__ == '__main__':
import sys
main(*sys.argv[1:])

@ -0,0 +1,132 @@
import json
import time
import hashlib
# Calculates intersection of time range of both messages, or None if they don't overlap
def overlap(a, b):
range_start = max(a['time'], b['time'])
range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range'])
if range_end < range_start:
return None
return range_start, range_end - range_start
# Returns merged message if two messages are compatible with being the same message,
# or else None.
def merge_message(a, b):
o = overlap(a, b)
if o and all(
a.get(k) == b.get(k)
for k in set(a.keys()) | set(b.keys())
if k not in ("receivers", "time", "time_range")
):
receivers = a["receivers"] | b["receivers"]
# Error checking - make sure no receiver timestamps are being overwritten.
# This would indicate we're merging two messages recieved at different times
# by the same recipient.
for k in receivers.keys():
for old in (a, b):
if k in old and old[k] != receivers[k]:
raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}")
return a | {
"time": o[0],
"time_range": o[1],
"receivers": receivers,
}
return None
def merge_messages(*batches):
batches = [b for b in batches if len(b) > 0]
result = []
while len(batches) > 1:
# Shortcut for non-overlapping time ranges - if one batch has messages that end before
# the start of any other batch, take those messages directly without trying to match them.
# Find batches with the earliest and second earliest messages
first = batches[0]
second = batches[1]
if first[0]["time"] > second[0]["time"]:
first, second = second, first
for batch in batches[2:]:
if batch[0]["time"] < first[0]["time"]:
first, second = batch, first
elif batch[0]["time"] < second[0]["time"]:
second = batch
# Find messages in first batch that end before the start of second batch,
# and copy them directly to result.
cutoff = second[0]["time"]
while first and first[0]["time"] + first[0]["time_range"] < cutoff:
result.append(first[0])
first.pop(0)
# If first now overlaps with second, move on to try to find messages to merge.
# If no overlap (either first is exhausted, or second now starts sooner than first)
# then just start again from the top and look for more non-overapping ranges.
if not first:
batches.remove(first)
continue
if cutoff < first[0]["time"]:
continue
message = first.pop(0)
id = (message.get("tags") or {}).get("id")
# For each other batch, attempt to find a matching message
for batch in batches:
if batch is first:
continue
end = message["time"] + message["time_range"]
merged = None
for index, candidate in enumerate(batch):
# optimization: stop when earliest candidate is after message's end time
if end < candidate['time']:
break
if id is None:
merged = merge_message(message, candidate)
if merged is not None:
batch.pop(index)
break
elif (candidate.get("tags") or {}).get("id") == id:
merged = merge_message(message, candidate)
if merged is None:
raise ValueError("TODO")
batch.pop(index)
break
if merged is not None:
message = merged
result.append(message)
batches = [b for b in batches if len(b) > 0]
# Once all but one batch is exhausted, the last one must have all remaining messages unmatched.
# So just append everything.
if batches:
result += batches[0]
return result
def load(file):
with open(file) as f:
return [json.loads(line) for line in f.read().strip().split("\n")]
def main(*files):
out = False
if files and files[0] == "--out":
files = files[1:]
out = True
batches = [load(file) for file in files]
start = time.monotonic()
result = merge_messages(*batches)
interval = time.monotonic() - start
if out:
print(json.dumps(result))
else:
hash = hashlib.sha256(json.dumps(result).encode()).hexdigest()
print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}")
if __name__ == '__main__':
import sys
main(*sys.argv[1:])
Loading…
Cancel
Save