diff --git a/chat_archiver/chat_archiver/main.py b/chat_archiver/chat_archiver/main.py index 4a808ac..4844086 100644 --- a/chat_archiver/chat_archiver/main.py +++ b/chat_archiver/chat_archiver/main.py @@ -1,6 +1,5 @@ import base64 -import errno import hashlib import json import logging @@ -19,15 +18,13 @@ from uuid import uuid4 import gevent.event import gevent.queue -from common import ensure_directory -from common.stats import timed +from common import ensure_directory, listdir +from common.chat import BATCH_INTERVAL, format_batch, get_batch_files, merge_messages from girc import Client from monotonic import monotonic import prometheus_client as prom -# How long each batch is -BATCH_INTERVAL = 60 # These are known to arrive up to MAX_DELAY after their actual time DELAYED_COMMANDS = [ @@ -271,16 +268,6 @@ class Archiver(object): self.client.stop() -def listdir(path): - """as os.listdir but return [] if dir doesn't exist""" - try: - return os.listdir(path) - except OSError as e: - if e.errno != errno.ENOENT: - raise - return [] - - def write_batch(path, batch_time, messages, size_histogram=None): """Batches are named PATH/YYYY-MM-DDTHH/MM:SS-HASH.json""" output = (format_batch(messages) + '\n').encode('utf-8') @@ -303,32 +290,6 @@ def write_batch(path, batch_time, messages, size_histogram=None): return filepath -def format_batch(messages): - # We need to take some care to have a consistent ordering and format here. - # We use a "canonicalised JSON" format, which is really just whatever the python encoder does, - # with compact separators and sorted keys. - messages = [ - (message, json.dumps(message, separators=(',', ':'), sort_keys=True)) - for message in messages - ] - # We sort by timestamp, then timestamp range, then if all else fails, lexiographically - # on the encoded representation. - messages.sort(key=lambda item: (item[0]['time'], item[0]['time_range'], item[1])) - return "\n".join(line for message, line in messages) - - -def get_batch_files(path, batch_time): - """Returns list of batch filepaths for a given batch time""" - hour = datetime.utcfromtimestamp(batch_time).strftime("%Y-%m-%dT%H") - time = datetime.utcfromtimestamp(batch_time).strftime("%M:%S") - hourdir = os.path.join(path, hour) - return [ - os.path.join(hourdir, name) - for name in listdir(hourdir) - if name.startswith(time) and name.endswith(".json") - ] - - def merge_all(path, interval=None, stopping=None): """Repeatedly scans the batch directory for batch files with the same batch time, and merges them. By default, returns once it finds no duplicate files. @@ -418,100 +379,6 @@ def merge_batch_files(path, batch_time): if batch_file not in written: os.remove(batch_file) -@timed("merge_messages", normalize=lambda _, left, right: len(left) + len(right)) -def merge_messages(left, right): - """Merges two lists of messages into one merged list. - This operation should be a CRDT, ie. all the following hold: - - associative: merge(merge(A, B), C) == merge(A, merge(B, C)) - - commutitive: merge(A, B) == merge(B, A) - - reflexive: merge(A, A) == A - This means that no matter what order information from different sources - is incorporated (or if sources are repeated), the results should be the same. - """ - # An optimization - if either size is empty, return the other side without processing. - if not left: - return right - if not right: - return left - - # Calculates intersection of time range of both messages, or None if they don't overlap - def overlap(a, b): - range_start = max(a['time'], b['time']) - range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range']) - if range_end < range_start: - return None - return range_start, range_end - range_start - - # Returns merged message if two messages are compatible with being the same message, - # or else None. - def merge_message(a, b): - o = overlap(a, b) - if o and all( - a.get(k) == b.get(k) - for k in set(a.keys()) | set(b.keys()) - if k not in ("receivers", "time", "time_range") - ): - receivers = a["receivers"] | b["receivers"] - # Error checking - make sure no receiver timestamps are being overwritten. - # This would indicate we're merging two messages recieved at different times - # by the same recipient. - for k in receivers.keys(): - for old in (a, b): - if k in old and old[k] != receivers[k]: - raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}") - return a | { - "time": o[0], - "time_range": o[1], - "receivers": receivers, - } - return None - - # Match things with identical ids first, and collect unmatched into left and right lists - by_id = {} - unmatched = [], [] - for messages, u in zip((left, right), unmatched): - for message in messages: - id = (message.get('tags') or {}).get('id') - if id: - by_id.setdefault(id, []).append(message) - else: - u.append(message) - - result = [] - for id, messages in by_id.items(): - if len(messages) == 1: - logging.debug(f"Message with id {id} has no match") - result.append(messages[0]) - else: - merged = merge_message(*messages) - if merged is None: - raise ValueError(f"Got two non-matching messages with id {id}: {messages[0]}, {messages[1]}") - logging.debug(f"Merged messages with id {id}") - result.append(merged) - - # For time-range messages, pair off each one in left with first match in right, - # and pass through anything with no matches. - left_unmatched, right_unmatched = unmatched - for message in left_unmatched: - for other in right_unmatched: - merged = merge_message(message, other) - if merged: - logging.debug( - "Matched {m[command]} message {a[time]}+{a[time_range]} & {b[time]}+{b[time_range]} -> {m[time]}+{m[time_range]}" - .format(a=message, b=other, m=merged) - ) - right_unmatched.remove(other) - result.append(merged) - break - else: - logging.debug("No match found for {m[command]} at {m[time]}+{m[time_range]}".format(m=message)) - result.append(message) - for message in right_unmatched: - logging.debug("No match found for {m[command]} at {m[time]}+{m[time_range]}".format(m=message)) - result.append(message) - - return result - def main(channel, nick, oauth_token_path, base_dir='/mnt', name=None, merge_interval=60): with open(oauth_token_path) as f: diff --git a/chat_archiver/chat_archiver/merge.py b/chat_archiver/chat_archiver/merge.py index 50a2abc..ad60710 100644 --- a/chat_archiver/chat_archiver/merge.py +++ b/chat_archiver/chat_archiver/merge.py @@ -3,7 +3,7 @@ import argh import logging import json -from .main import merge_messages, format_batch +from common.chat import merge_messages, format_batch def main(*paths, log='INFO'): """Merge all listed batch files and output result to stdout""" diff --git a/chat_archiver/chat_archiver/merge_all.py b/chat_archiver/chat_archiver/merge_all.py index a9f0e65..166b56c 100644 --- a/chat_archiver/chat_archiver/merge_all.py +++ b/chat_archiver/chat_archiver/merge_all.py @@ -1,9 +1,8 @@ import argh import logging -import json -from .main import merge_messages, format_batch, merge_all +from .main import merge_all def main(path, log='INFO'): """Merge all batch files with the same timestamp within given directory""" diff --git a/common/common/__init__.py b/common/common/__init__.py index f906d54..8f288c2 100644 --- a/common/common/__init__.py +++ b/common/common/__init__.py @@ -150,3 +150,13 @@ def serve_with_graceful_shutdown(server, stop_timeout=20): logging.info("Shutting down") server.stop(stop_timeout) logging.info("Gracefully shut down") + + +def listdir(path): + """as os.listdir but return [] if dir doesn't exist""" + try: + return os.listdir(path) + except OSError as e: + if e.errno != errno.ENOENT: + raise + return [] diff --git a/common/common/chat.py b/common/common/chat.py new file mode 100644 index 0000000..42c3cdb --- /dev/null +++ b/common/common/chat.py @@ -0,0 +1,152 @@ + +import json +import logging +import os +from datetime import datetime, timedelta + +from common import listdir +from common.stats import timed +from common.segments import hour_paths_for_range + + +# How long each batch is +BATCH_INTERVAL = 60 + + +def format_batch(messages): + # We need to take some care to have a consistent ordering and format here. + # We use a "canonicalised JSON" format, which is really just whatever the python encoder does, + # with compact separators and sorted keys. + messages = [ + (message, json.dumps(message, separators=(',', ':'), sort_keys=True)) + for message in messages + ] + # We sort by timestamp, then timestamp range, then if all else fails, lexiographically + # on the encoded representation. + messages.sort(key=lambda item: (item[0]['time'], item[0]['time_range'], item[1])) + return "\n".join(line for message, line in messages) + + +def get_batch_files(path, batch_time): + """Returns list of batch filepaths for a given batch time as unix timestamp""" + hour = datetime.utcfromtimestamp(batch_time).strftime("%Y-%m-%dT%H") + time = datetime.utcfromtimestamp(batch_time).strftime("%M:%S") + hourdir = os.path.join(path, hour) + return [ + os.path.join(hourdir, name) + for name in listdir(hourdir) + if name.startswith(time) and name.endswith(".json") + ] + + +def get_batch_file_range(hours_path, start, end): + """Returns list of batch filepaths covering at least the time range given, but possibly longer. + May contain multiple results with the same timestamp. + start and end must be datetimes. + """ + # pad start and end to capture neighboring batches, including messages + # with a wide time range, which might actually be in an even earlier batch. + start -= timedelta(seconds=2 * BATCH_INTERVAL) + end += timedelta(seconds=BATCH_INTERVAL) + for hour_path in hour_paths_for_range(hours_path, start, end): + hour = os.path.basename(hour_path) + for name in listdir(hour_path): + min_sec = name.split("-")[0] + timestamp = datetime.strptime("{}:{}".format(hour, min_sec), "%Y-%m-%dT%H:%M:%S") + if start < timestamp < end: + yield os.path.join(hour_path, name) + + +@timed("merge_messages", normalize=lambda _, left, right: len(left) + len(right)) +def merge_messages(left, right): + """Merges two lists of messages into one merged list. + This operation should be a CRDT, ie. all the following hold: + - associative: merge(merge(A, B), C) == merge(A, merge(B, C)) + - commutitive: merge(A, B) == merge(B, A) + - reflexive: merge(A, A) == A + This means that no matter what order information from different sources + is incorporated (or if sources are repeated), the results should be the same. + """ + # An optimization - if either size is empty, return the other side without processing. + if not left: + return right + if not right: + return left + + # Calculates intersection of time range of both messages, or None if they don't overlap + def overlap(a, b): + range_start = max(a['time'], b['time']) + range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range']) + if range_end < range_start: + return None + return range_start, range_end - range_start + + # Returns merged message if two messages are compatible with being the same message, + # or else None. + def merge_message(a, b): + o = overlap(a, b) + if o and all( + a.get(k) == b.get(k) + for k in set(a.keys()) | set(b.keys()) + if k not in ("receivers", "time", "time_range") + ): + receivers = a["receivers"] | b["receivers"] + # Error checking - make sure no receiver timestamps are being overwritten. + # This would indicate we're merging two messages recieved at different times + # by the same recipient. + for k in receivers.keys(): + for old in (a, b): + if k in old and old[k] != receivers[k]: + raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}") + return a | { + "time": o[0], + "time_range": o[1], + "receivers": receivers, + } + return None + + # Match things with identical ids first, and collect unmatched into left and right lists + by_id = {} + unmatched = [], [] + for messages, u in zip((left, right), unmatched): + for message in messages: + id = (message.get('tags') or {}).get('id') + if id: + by_id.setdefault(id, []).append(message) + else: + u.append(message) + + result = [] + for id, messages in by_id.items(): + if len(messages) == 1: + logging.debug(f"Message with id {id} has no match") + result.append(messages[0]) + else: + merged = merge_message(*messages) + if merged is None: + raise ValueError(f"Got two non-matching messages with id {id}: {messages[0]}, {messages[1]}") + logging.debug(f"Merged messages with id {id}") + result.append(merged) + + # For time-range messages, pair off each one in left with first match in right, + # and pass through anything with no matches. + left_unmatched, right_unmatched = unmatched + for message in left_unmatched: + for other in right_unmatched: + merged = merge_message(message, other) + if merged: + logging.debug( + "Matched {m[command]} message {a[time]}+{a[time_range]} & {b[time]}+{b[time_range]} -> {m[time]}+{m[time_range]}" + .format(a=message, b=other, m=merged) + ) + right_unmatched.remove(other) + result.append(merged) + break + else: + logging.debug("No match found for {m[command]} at {m[time]}+{m[time_range]}".format(m=message)) + result.append(message) + for message in right_unmatched: + logging.debug("No match found for {m[command]} at {m[time]}+{m[time_range]}".format(m=message)) + result.append(message) + + return result diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index ee45ea5..c64cc50 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -17,6 +17,7 @@ from gevent.pywsgi import WSGIServer from common import dateutil, get_best_segments, rough_cut_segments, fast_cut_segments, full_cut_segments, PromLogCountsHandler, install_stacksampler, serve_with_graceful_shutdown from common.flask_stats import request_stats, after_request from common.segments import feed_input, render_segments_waveform, extract_frame, list_segment_files +from common.chat import get_batch_file_range, merge_messages from . import generate_hls @@ -91,8 +92,8 @@ def metrics(): @app.route('/metrics/') @request_stats def metrics_with_trailing(trailing): - """Expose Prometheus metrics.""" - return prom.generate_latest() + """Expose Prometheus metrics.""" + return prom.generate_latest() @app.route('/files') @request_stats @@ -393,6 +394,60 @@ def get_frame(channel, quality): return Response(extract_frame(segments, timestamp), mimetype='image/png') +@app.route('//chat.json') +@request_stats +@has_path_args +def get_chat_messages(channel): + """ + Returns a JSON list of chat messages from the given time range. + The messages are in the same format as used in the chat archiver. + Messages without an exact known time are included if their possible time range + intersects with the requested time range. Note this means that messages in range (A, B) + and range (B, C) may overlap! Thankfully the kinds of messages this can happen for mostly + don't matter - JOINs and PARTs mainly, but sometimes ROOMSTATEs, NOTICEs and CLEARCHATs. + Params: + start, end: Required. The start and end times. + Must be in ISO 8601 format (ie. yyyy-mm-ddTHH:MM:SS) and UTC. + """ + start = dateutil.parse_utc_only(request.args['start']) + end = dateutil.parse_utc_only(request.args['end']) + if end <= start: + return "End must be after start", 400 + + hours_path = os.path.join(app.static_folder, channel, "chat") + + # This process below may fail if a batch is deleted out from underneath us. + # If that happens, we need to start again. + retry = True + while retry: + retry = False + messages = [] + for batch_file in get_batch_file_range(hours_path, start, end): + try: + with open(batch_file) as f: + batch = f.read() + except OSError as e: + if e.errno != errno.ENOENT: + raise + # If file doesn't exist, retry the outer loop + retry = True + break + batch = [json.loads(line) for line in batch.strip().split("\n")] + messages = merge_messages(messages, batch) + + start = start.timestamp() + end = end.timestamp() + messages = sorted( + [ + m for m in messages + # message ends after START, and starts before END + if start <= m['time'] + m['time_range'] and m['time'] < end + ], key=lambda m: (m['time'], m['time_range']) + ) + + return json.dumps(messages) + + @app.route('/generate_videos//', methods=['POST']) @request_stats @has_path_args