pull/465/merge
Mike Lang 3 months ago committed by GitHub
commit 3a49499e83
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,112 @@
import json
import time
import hashlib
# Calculates intersection of time range of both messages, or None if they don't overlap
def overlap(a, b):
range_start = max(a['time'], b['time'])
range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range'])
if range_end < range_start:
return None
return range_start, range_end - range_start
# Returns merged message if two messages are compatible with being the same message,
# or else None.
def merge_message(a, b):
o = overlap(a, b)
if o and all(
a.get(k) == b.get(k)
for k in set(a.keys()) | set(b.keys())
if k not in ("receivers", "time", "time_range")
):
receivers = a["receivers"] | b["receivers"]
# Error checking - make sure no receiver timestamps are being overwritten.
# This would indicate we're merging two messages recieved at different times
# by the same recipient.
for k in receivers.keys():
for old in (a, b):
if k in old and old[k] != receivers[k]:
raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}")
return a | {
"time": o[0],
"time_range": o[1],
"receivers": receivers,
}
return None
def merge_messages(left, right):
# An optimization - if either size is empty, return the other side without processing.
if not left:
return right
if not right:
return left
# Match things with identical ids first, and collect unmatched into left and right lists
by_id = {}
unmatched = [], []
for messages, u in zip((left, right), unmatched):
for message in messages:
id = (message.get('tags') or {}).get('id')
if id:
by_id.setdefault(id, []).append(message)
else:
u.append(message)
result = []
for id, messages in by_id.items():
if len(messages) == 1:
result.append(messages[0])
else:
merged = merge_message(*messages)
if merged is None:
raise ValueError(f"Got two non-matching messages with id {id}: {messages[0]}, {messages[1]}")
result.append(merged)
# For time-range messages, pair off each one in left with first match in right,
# and pass through anything with no matches.
left_unmatched, right_unmatched = unmatched
for message in left_unmatched:
for other in right_unmatched:
merged = merge_message(message, other)
if merged:
right_unmatched.remove(other)
result.append(merged)
break
else:
result.append(message)
for message in right_unmatched:
result.append(message)
return result
def load(file):
with open(file) as f:
return [json.loads(line) for line in f.read().strip().split("\n")]
def main(*files):
out = False
if files and files[0] == "--out":
files = files[1:]
out = True
batches = [load(file) for file in files]
result = batches[0]
start = time.monotonic()
for batch in batches[1:]:
result = merge_messages(result, batch)
interval = time.monotonic() - start
# v1 version must be corrected to have messages in time order, which v2 does by default
# but causes problems when comparing.
# We don't count this against the execution time.
result.sort(key=lambda o: o["time"])
if out:
print(json.dumps(result))
else:
hash = hashlib.sha256(json.dumps(result).encode()).hexdigest()
print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}")
if __name__ == '__main__':
import sys
main(*sys.argv[1:])

@ -0,0 +1,109 @@
import json
import time
import hashlib
# Calculates intersection of time range of both messages, or None if they don't overlap
def overlap(a, b):
range_start = max(a['time'], b['time'])
range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range'])
if range_end < range_start:
return None
return range_start, range_end - range_start
# Returns merged message if two messages are compatible with being the same message,
# or else None.
def merge_message(a, b):
o = overlap(a, b)
if o and all(
a.get(k) == b.get(k)
for k in set(a.keys()) | set(b.keys())
if k not in ("receivers", "time", "time_range")
):
receivers = a["receivers"] | b["receivers"]
# Error checking - make sure no receiver timestamps are being overwritten.
# This would indicate we're merging two messages recieved at different times
# by the same recipient.
for k in receivers.keys():
for old in (a, b):
if k in old and old[k] != receivers[k]:
raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}")
return a | {
"time": o[0],
"time_range": o[1],
"receivers": receivers,
}
return None
def merge_messages(left, right):
if not left:
return right
if not right:
return left
result = []
while left and right:
# Find earliest message out of left and right.
# The other side becomes the candidate messages.
if left[0]['time'] <= right[0]['time']:
message, candidates = left.pop(0), right
else:
message, candidates = right.pop(0), left
# Scan candidates for matching message until found or we know there is no match
id = (message.get("tags") or {}).get("id")
end = message['time'] + message['time_range']
merged = None
for index, candidate in enumerate(candidates):
# optimization: stop when earliest candidate is after message's end time
if end < candidate['time']:
break
if id is None:
merged = merge_message(message, candidate)
if merged is not None:
candidates.pop(index)
break
elif (candidate.get("tags") or {}).get("id") == id:
merged = merge_message(message, candidate)
if merged is None:
raise ValueError("TODO")
candidates.pop(index)
break
# If unmatched, just keep original
if merged is None:
merged = message
result.append(message)
# Once one side is exhausted, the other side must have all remaining messages unmatched.
# So just append everything.
result += left + right
return result
def load(file):
with open(file) as f:
return [json.loads(line) for line in f.read().strip().split("\n")]
def main(*files):
out = False
if files and files[0] == "--out":
files = files[1:]
out = True
batches = [load(file) for file in files]
result = batches[0]
start = time.monotonic()
for batch in batches[1:]:
result = merge_messages(result, batch)
interval = time.monotonic() - start
if out:
print(json.dumps(result))
else:
hash = hashlib.sha256(json.dumps(result).encode()).hexdigest()
print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}")
if __name__ == '__main__':
import sys
main(*sys.argv[1:])

@ -0,0 +1,104 @@
import json
import time
import hashlib
# Calculates intersection of time range of both messages, or None if they don't overlap
def overlap(a, b):
range_start = max(a['time'], b['time'])
range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range'])
if range_end < range_start:
return None
return range_start, range_end - range_start
# Returns merged message if two messages are compatible with being the same message,
# or else None.
def merge_message(a, b):
o = overlap(a, b)
if o and all(
a.get(k) == b.get(k)
for k in set(a.keys()) | set(b.keys())
if k not in ("receivers", "time", "time_range")
):
receivers = a["receivers"] | b["receivers"]
# Error checking - make sure no receiver timestamps are being overwritten.
# This would indicate we're merging two messages recieved at different times
# by the same recipient.
for k in receivers.keys():
for old in (a, b):
if k in old and old[k] != receivers[k]:
raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}")
return a | {
"time": o[0],
"time_range": o[1],
"receivers": receivers,
}
return None
def merge_messages(*batches):
batches = [b for b in batches if len(b) > 0]
result = []
while len(batches) > 1:
# Find batch with the earliest message
first_batch = min(batches, key=lambda b: b[0]["time"])
message = first_batch.pop(0)
id = (message.get("tags") or {}).get("id")
# For each other batch, attempt to find a matching message
for batch in batches:
if batch is first_batch:
continue
end = message["time"] + message["time_range"]
merged = None
for index, candidate in enumerate(batch):
# optimization: stop when earliest candidate is after message's end time
if end < candidate['time']:
break
if id is None:
merged = merge_message(message, candidate)
if merged is not None:
batch.pop(index)
break
elif (candidate.get("tags") or {}).get("id") == id:
merged = merge_message(message, candidate)
if merged is None:
raise ValueError("TODO")
batch.pop(index)
break
if merged is not None:
message = merged
result.append(message)
batches = [b for b in batches if len(b) > 0]
# Once all but one batch is exhausted, the last one must have all remaining messages unmatched.
# So just append everything.
if batches:
result += batches[0]
return result
def load(file):
with open(file) as f:
return [json.loads(line) for line in f.read().strip().split("\n")]
def main(*files):
out = False
if files and files[0] == "--out":
files = files[1:]
out = True
batches = [load(file) for file in files]
start = time.monotonic()
result = merge_messages(*batches)
interval = time.monotonic() - start
if out:
print(json.dumps(result))
else:
hash = hashlib.sha256(json.dumps(result).encode()).hexdigest()
print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}")
if __name__ == '__main__':
import sys
main(*sys.argv[1:])

@ -0,0 +1,132 @@
import json
import time
import hashlib
# Calculates intersection of time range of both messages, or None if they don't overlap
def overlap(a, b):
range_start = max(a['time'], b['time'])
range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range'])
if range_end < range_start:
return None
return range_start, range_end - range_start
# Returns merged message if two messages are compatible with being the same message,
# or else None.
def merge_message(a, b):
o = overlap(a, b)
if o and all(
a.get(k) == b.get(k)
for k in set(a.keys()) | set(b.keys())
if k not in ("receivers", "time", "time_range")
):
receivers = a["receivers"] | b["receivers"]
# Error checking - make sure no receiver timestamps are being overwritten.
# This would indicate we're merging two messages recieved at different times
# by the same recipient.
for k in receivers.keys():
for old in (a, b):
if k in old and old[k] != receivers[k]:
raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}")
return a | {
"time": o[0],
"time_range": o[1],
"receivers": receivers,
}
return None
def merge_messages(*batches):
batches = [b for b in batches if len(b) > 0]
result = []
while len(batches) > 1:
# Shortcut for non-overlapping time ranges - if one batch has messages that end before
# the start of any other batch, take those messages directly without trying to match them.
# Find batches with the earliest and second earliest messages
first = batches[0]
second = batches[1]
if first[0]["time"] > second[0]["time"]:
first, second = second, first
for batch in batches[2:]:
if batch[0]["time"] < first[0]["time"]:
first, second = batch, first
elif batch[0]["time"] < second[0]["time"]:
second = batch
# Find messages in first batch that end before the start of second batch,
# and copy them directly to result.
cutoff = second[0]["time"]
while first and first[0]["time"] + first[0]["time_range"] < cutoff:
result.append(first[0])
first.pop(0)
# If first now overlaps with second, move on to try to find messages to merge.
# If no overlap (either first is exhausted, or second now starts sooner than first)
# then just start again from the top and look for more non-overapping ranges.
if not first:
batches.remove(first)
continue
if cutoff < first[0]["time"]:
continue
message = first.pop(0)
id = (message.get("tags") or {}).get("id")
# For each other batch, attempt to find a matching message
for batch in batches:
if batch is first:
continue
end = message["time"] + message["time_range"]
merged = None
for index, candidate in enumerate(batch):
# optimization: stop when earliest candidate is after message's end time
if end < candidate['time']:
break
if id is None:
merged = merge_message(message, candidate)
if merged is not None:
batch.pop(index)
break
elif (candidate.get("tags") or {}).get("id") == id:
merged = merge_message(message, candidate)
if merged is None:
raise ValueError("TODO")
batch.pop(index)
break
if merged is not None:
message = merged
result.append(message)
batches = [b for b in batches if len(b) > 0]
# Once all but one batch is exhausted, the last one must have all remaining messages unmatched.
# So just append everything.
if batches:
result += batches[0]
return result
def load(file):
with open(file) as f:
return [json.loads(line) for line in f.read().strip().split("\n")]
def main(*files):
out = False
if files and files[0] == "--out":
files = files[1:]
out = True
batches = [load(file) for file in files]
start = time.monotonic()
result = merge_messages(*batches)
interval = time.monotonic() - start
if out:
print(json.dumps(result))
else:
hash = hashlib.sha256(json.dumps(result).encode()).hexdigest()
print(f"Merged {len(batches)} batches in {interval:.3f}s to hash {hash}")
if __name__ == '__main__':
import sys
main(*sys.argv[1:])
Loading…
Cancel
Save