|
|
|
@ -1,5 +1,6 @@
|
|
|
|
|
|
|
|
|
|
import base64
|
|
|
|
|
import errno
|
|
|
|
|
import hashlib
|
|
|
|
|
import json
|
|
|
|
|
import logging
|
|
|
|
@ -53,7 +54,7 @@ class Archiver(object):
|
|
|
|
|
self.logger = logging.getLogger(type(self).__name__).getChild(channel)
|
|
|
|
|
self.name = name
|
|
|
|
|
self.messages = gevent.queue.Queue()
|
|
|
|
|
self.path = os.path.join(base_dir, channel)
|
|
|
|
|
self.path = os.path.join(base_dir, channel, "chat")
|
|
|
|
|
|
|
|
|
|
self.stopping = gevent.event.Event()
|
|
|
|
|
self.got_reconnect = gevent.event.Event()
|
|
|
|
@ -188,8 +189,9 @@ class Archiver(object):
|
|
|
|
|
def write_batch(path, batch_time, messages):
|
|
|
|
|
output = (format_batch(messages) + '\n').encode('utf-8')
|
|
|
|
|
hash = base64.b64encode(hashlib.sha256(output).digest(), b"-_").decode().rstrip("=")
|
|
|
|
|
time = datetime.utcfromtimestamp(batch_time).strftime("%Y-%m-%dT%H:%M:%S")
|
|
|
|
|
filename = "{}-{}.json".format(time, hash)
|
|
|
|
|
hour = datetime.utcfromtimestamp(batch_time).strftime("%Y-%m-%dT%H")
|
|
|
|
|
time = datetime.utcfromtimestamp(batch_time).strftime("%M:%S")
|
|
|
|
|
filename = os.path.join(hour, "{}-{}.json".format(time, hash))
|
|
|
|
|
filepath = os.path.join(path, filename)
|
|
|
|
|
if os.path.exists(filepath):
|
|
|
|
|
logging.info("Not writing batch {} - already exists.".format(filename))
|
|
|
|
@ -219,10 +221,19 @@ def format_batch(messages):
|
|
|
|
|
|
|
|
|
|
def get_batch_files(path, batch_time):
|
|
|
|
|
"""Returns list of batch filepaths for a given batch time"""
|
|
|
|
|
time = datetime.utcfromtimestamp(batch_time).strftime("%Y-%m-%dT%H:%M:%S-")
|
|
|
|
|
hour = datetime.utcfromtimestamp(batch_time).strftime("%Y-%m-%dT%H")
|
|
|
|
|
time = datetime.utcfromtimestamp(batch_time).strftime("%M:%S")
|
|
|
|
|
hourdir = os.path.join(path, hour)
|
|
|
|
|
# return [] if dir doesn't exist
|
|
|
|
|
try:
|
|
|
|
|
files = os.listdir(hourdir)
|
|
|
|
|
except OSError as e:
|
|
|
|
|
if e.errno != errno.ENOENT:
|
|
|
|
|
raise
|
|
|
|
|
return []
|
|
|
|
|
return [
|
|
|
|
|
os.path.join(path, name)
|
|
|
|
|
for name in os.listdir(path)
|
|
|
|
|
os.path.join(hourdir, name)
|
|
|
|
|
for name in files
|
|
|
|
|
if name.startswith(time) and name.endswith(".json")
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
@ -232,10 +243,12 @@ def merge_all(path):
|
|
|
|
|
Returns once it finds no duplicate files."""
|
|
|
|
|
while True:
|
|
|
|
|
by_time = {}
|
|
|
|
|
for name in os.listdir(path):
|
|
|
|
|
if not name.endswith(".json"):
|
|
|
|
|
continue
|
|
|
|
|
timestamp = "-".join(name.split("-")[:3])
|
|
|
|
|
for hour in os.listdir(path):
|
|
|
|
|
for name in os.listdir(os.path.join(path, hour)):
|
|
|
|
|
if not name.endswith(".json"):
|
|
|
|
|
continue
|
|
|
|
|
time = "-".join(name.split("-")[:3])
|
|
|
|
|
timestamp = "{}:{}".format(hour, time)
|
|
|
|
|
by_time[timestamp] = by_time.get(timestamp, 0) + 1
|
|
|
|
|
if not any(count > 1 for timestamp, count in by_time.items()):
|
|
|
|
|
break
|
|
|
|
|