|
|
|
@ -1,4 +1,5 @@
|
|
|
|
|
|
|
|
|
|
from calendar import timegm
|
|
|
|
|
import base64
|
|
|
|
|
import hashlib
|
|
|
|
|
import json
|
|
|
|
@ -178,12 +179,16 @@ def write_batch(path, batch_time, messages):
|
|
|
|
|
time = datetime.utcfromtimestamp(batch_time).strftime("%Y-%m-%dT%H:%M:%S")
|
|
|
|
|
filename = "{}-{}.json".format(time, hash)
|
|
|
|
|
filepath = os.path.join(path, filename)
|
|
|
|
|
if os.path.exists(filepath):
|
|
|
|
|
logging.info("Not writing batch {} - already exists.".format(filename))
|
|
|
|
|
else:
|
|
|
|
|
temppath = "{}.{}.temp".format(filepath, uuid4())
|
|
|
|
|
ensure_directory(filepath)
|
|
|
|
|
with open(temppath, 'wb') as f:
|
|
|
|
|
f.write(output)
|
|
|
|
|
os.rename(temppath, filepath)
|
|
|
|
|
logging.info("Wrote batch {}".format(filepath))
|
|
|
|
|
return filepath
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def format_batch(messages):
|
|
|
|
@ -210,6 +215,25 @@ def get_batch_files(path, batch_time):
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def merge_all(path):
|
|
|
|
|
"""Repeatedly scans the batch directory for batch files with the same batch time, and merges them.
|
|
|
|
|
Returns once it finds no duplicate files."""
|
|
|
|
|
while True:
|
|
|
|
|
by_time = {}
|
|
|
|
|
for name in os.listdir(path):
|
|
|
|
|
if not name.endswith(".json"):
|
|
|
|
|
continue
|
|
|
|
|
timestamp = "-".join(name.split("-")[:3])
|
|
|
|
|
by_time[timestamp] = by_time.get(timestamp, 0) + 1
|
|
|
|
|
if not any(count > 1 for timestamp, count in by_time.items()):
|
|
|
|
|
break
|
|
|
|
|
for timestamp, count in by_time.items():
|
|
|
|
|
if count > 1:
|
|
|
|
|
logging.info("Merging {} batches at time {}".format(count, timestamp))
|
|
|
|
|
batch_time = timegm(time.strptime(timestamp, "%Y-%m-%dT%H:%M:%S"))
|
|
|
|
|
merge_batch_files(path, batch_time)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def merge_batch_files(path, batch_time):
|
|
|
|
|
"""For the given batch time, merges all the following messages:
|
|
|
|
|
- From batch files at that time
|
|
|
|
@ -241,14 +265,18 @@ def merge_batch_files(path, batch_time):
|
|
|
|
|
batch = [json.loads(line) for line in batch.strip().split("\n")]
|
|
|
|
|
messages = merge_messages(messages, batch)
|
|
|
|
|
|
|
|
|
|
# sorting by time is needed for group_by(), we'll sort properly on save.
|
|
|
|
|
messages.sort(key=lambda message: message['time'])
|
|
|
|
|
for batch_time, batch in itertools.group_by(messages, key=
|
|
|
|
|
lambda message: int(message['time'] / BATCH_INTERVAL) * BATCH_INTERVAL
|
|
|
|
|
):
|
|
|
|
|
write_batch(path, batch_time, batch)
|
|
|
|
|
by_time = {}
|
|
|
|
|
for message in messages:
|
|
|
|
|
batch_time = int(message['time'] / BATCH_INTERVAL) * BATCH_INTERVAL
|
|
|
|
|
by_time.setdefault(batch_time, []).append(message)
|
|
|
|
|
|
|
|
|
|
written = []
|
|
|
|
|
for batch_time, batch in by_time.items():
|
|
|
|
|
written.append(write_batch(path, batch_time, batch))
|
|
|
|
|
|
|
|
|
|
for batch_file in batch_files:
|
|
|
|
|
# don't delete something we just (re-)wrote
|
|
|
|
|
if batch_file not in written:
|
|
|
|
|
os.remove(batch_file)
|
|
|
|
|
|
|
|
|
|
def merge_messages(left, right):
|
|
|
|
@ -289,7 +317,7 @@ def merge_messages(left, right):
|
|
|
|
|
# by the same recipient.
|
|
|
|
|
for k in receivers.keys():
|
|
|
|
|
for old in (a, b):
|
|
|
|
|
if k in old and old[k] != recievers[k]:
|
|
|
|
|
if k in old and old[k] != receivers[k]:
|
|
|
|
|
raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}")
|
|
|
|
|
return a | {
|
|
|
|
|
"time": o[0],
|
|
|
|
@ -303,7 +331,7 @@ def merge_messages(left, right):
|
|
|
|
|
unmatched = [], []
|
|
|
|
|
for messages, u in zip((left, right), unmatched):
|
|
|
|
|
for message in messages:
|
|
|
|
|
id = message.get('tags', {}).get('id')
|
|
|
|
|
id = (message.get('tags') or {}).get('id')
|
|
|
|
|
if id:
|
|
|
|
|
by_id.setdefault(id, []).append(message)
|
|
|
|
|
else:
|
|
|
|
|