mirror of https://github.com/ekimekim/wubloader
merge_v4 is more specialized to merge large numbers of non-overlapping batches
It can do 24h of batches in 180ms, ~180x faster than v1. I'm a little concerned it's overly specialized for that role and will be slow to merge duplicate batches, but those operations involve tiny amounts of data by comparison so it doesn't really matter.mike/faster-chat-merge
parent
eeeb48c0f8
commit
c87a25e875
@ -0,0 +1,132 @@
|
||||
import json
|
||||
import time
|
||||
import hashlib
|
||||
|
||||
# Calculates intersection of time range of both messages, or None if they don't overlap
|
||||
def overlap(a, b):
|
||||
range_start = max(a['time'], b['time'])
|
||||
range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range'])
|
||||
if range_end < range_start:
|
||||
return None
|
||||
return range_start, range_end - range_start
|
||||
|
||||
# Returns merged message if two messages are compatible with being the same message,
|
||||
# or else None.
|
||||
def merge_message(a, b):
|
||||
o = overlap(a, b)
|
||||
if o and all(
|
||||
a.get(k) == b.get(k)
|
||||
for k in set(a.keys()) | set(b.keys())
|
||||
if k not in ("receivers", "time", "time_range")
|
||||
):
|
||||
receivers = a["receivers"] | b["receivers"]
|
||||
# Error checking - make sure no receiver timestamps are being overwritten.
|
||||
# This would indicate we're merging two messages recieved at different times
|
||||
# by the same recipient.
|
||||
for k in receivers.keys():
|
||||
for old in (a, b):
|
||||
if k in old and old[k] != receivers[k]:
|
||||
raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}")
|
||||
return a | {
|
||||
"time": o[0],
|
||||
"time_range": o[1],
|
||||
"receivers": receivers,
|
||||
}
|
||||
return None
|
||||
|
||||
def merge_messages(*batches):
|
||||
batches = [b for b in batches if len(b) > 0]
|
||||
|
||||
result = []
|
||||
while len(batches) > 1:
|
||||
# Shortcut for non-overlapping time ranges - if one batch has messages that end before
|
||||
# the start of any other batch, take those messages directly without trying to match them.
|
||||
|
||||
# Find batches with the earliest and second earliest messages
|
||||
first = batches[0]
|
||||
second = batches[1]
|
||||
if first[0]["time"] > second[0]["time"]:
|
||||
first, second = second, first
|
||||
for batch in batches[2:]:
|
||||
if batch[0]["time"] < first[0]["time"]:
|
||||
first, second = batch, first
|
||||
elif batch[0]["time"] < second[0]["time"]:
|
||||
second = batch
|
||||
|
||||
# Find messages in first batch that end before the start of second batch,
|
||||
# and copy them directly to result.
|
||||
cutoff = second[0]["time"]
|
||||
while first and first[0]["time"] + first[0]["time_range"] < cutoff:
|
||||
result.append(first[0])
|
||||
first.pop(0)
|
||||
|
||||
# If first now overlaps with second, move on to try to find messages to merge.
|
||||
# If no overlap (either first is exhausted, or second now starts sooner than first)
|
||||
# then just start again from the top and look for more non-overapping ranges.
|
||||
if not first:
|
||||
batches.remove(first)
|
||||
continue
|
||||
if cutoff < first[0]["time"]:
|
||||
continue
|
||||
|
||||
message = first.pop(0)
|
||||
id = (message.get("tags") or {}).get("id")
|
||||
|
||||
# For each other batch, attempt to find a matching message
|
||||
for batch in batches:
|
||||
if batch is first:
|
||||
continue
|
||||
end = message["time"] + message["time_range"]
|
||||
merged = None
|
||||
for index, candidate in enumerate(batch):
|
||||
# optimization: stop when earliest candidate is after message's end time
|
||||
if end < candidate['time']:
|
||||
break
|
||||
if id is None:
|
||||
merged = merge_message(message, candidate)
|
||||
if merged is not None:
|
||||
candidates.pop(index)
|
||||
break
|
||||
elif (candidate.get("tags") or {}).get("id") == id:
|
||||
merged = merge_message(message, candidate)
|
||||
if merged is None:
|
||||
raise ValueError("TODO")
|
||||
candidates.pop(index)
|
||||
break
|
||||
if merged is not None:
|
||||
message = merged
|
||||
|
||||
result.append(message)
|
||||
batches = [b for b in batches if len(b) > 0]
|
||||
|
||||
# Once all but one batch is exhausted, the last one must have all remaining messages unmatched.
|
||||
# So just append everything.
|
||||
if batches:
|
||||
result += batches[0]
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def load(file):
|
||||
with open(file) as f:
|
||||
return [json.loads(line) for line in f.read().strip().split("\n")]
|
||||
|
||||
|
||||
def main(*files):
|
||||
out = False
|
||||
if files and files[0] == "--out":
|
||||
files = files[1:]
|
||||
out = True
|
||||
batches = [load(file) for file in files]
|
||||
start = time.monotonic()
|
||||
result = merge_messages(*batches)
|
||||
interval = time.monotonic() - start
|
||||
if out:
|
||||
print(json.dumps(result))
|
||||
else:
|
||||
hash = hashlib.sha256(json.dumps(result).encode()).hexdigest()
|
||||
print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}")
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
main(*sys.argv[1:])
|
Loading…
Reference in New Issue