Add restreamer endpoint for viewing chat messages

pull/300/head
Mike Lang 2 years ago committed by Mike Lang
parent 9320251de7
commit 08257386e2

@ -1,6 +1,5 @@
import base64 import base64
import errno
import hashlib import hashlib
import json import json
import logging import logging
@ -19,15 +18,13 @@ from uuid import uuid4
import gevent.event import gevent.event
import gevent.queue import gevent.queue
from common import ensure_directory from common import ensure_directory, listdir
from common.stats import timed from common.chat import BATCH_INTERVAL, format_batch, get_batch_files, merge_messages
from girc import Client from girc import Client
from monotonic import monotonic from monotonic import monotonic
import prometheus_client as prom import prometheus_client as prom
# How long each batch is
BATCH_INTERVAL = 60
# These are known to arrive up to MAX_DELAY after their actual time # These are known to arrive up to MAX_DELAY after their actual time
DELAYED_COMMANDS = [ DELAYED_COMMANDS = [
@ -271,16 +268,6 @@ class Archiver(object):
self.client.stop() self.client.stop()
def listdir(path):
"""as os.listdir but return [] if dir doesn't exist"""
try:
return os.listdir(path)
except OSError as e:
if e.errno != errno.ENOENT:
raise
return []
def write_batch(path, batch_time, messages, size_histogram=None): def write_batch(path, batch_time, messages, size_histogram=None):
"""Batches are named PATH/YYYY-MM-DDTHH/MM:SS-HASH.json""" """Batches are named PATH/YYYY-MM-DDTHH/MM:SS-HASH.json"""
output = (format_batch(messages) + '\n').encode('utf-8') output = (format_batch(messages) + '\n').encode('utf-8')
@ -303,32 +290,6 @@ def write_batch(path, batch_time, messages, size_histogram=None):
return filepath return filepath
def format_batch(messages):
# We need to take some care to have a consistent ordering and format here.
# We use a "canonicalised JSON" format, which is really just whatever the python encoder does,
# with compact separators and sorted keys.
messages = [
(message, json.dumps(message, separators=(',', ':'), sort_keys=True))
for message in messages
]
# We sort by timestamp, then timestamp range, then if all else fails, lexiographically
# on the encoded representation.
messages.sort(key=lambda item: (item[0]['time'], item[0]['time_range'], item[1]))
return "\n".join(line for message, line in messages)
def get_batch_files(path, batch_time):
"""Returns list of batch filepaths for a given batch time"""
hour = datetime.utcfromtimestamp(batch_time).strftime("%Y-%m-%dT%H")
time = datetime.utcfromtimestamp(batch_time).strftime("%M:%S")
hourdir = os.path.join(path, hour)
return [
os.path.join(hourdir, name)
for name in listdir(hourdir)
if name.startswith(time) and name.endswith(".json")
]
def merge_all(path, interval=None, stopping=None): def merge_all(path, interval=None, stopping=None):
"""Repeatedly scans the batch directory for batch files with the same batch time, and merges them. """Repeatedly scans the batch directory for batch files with the same batch time, and merges them.
By default, returns once it finds no duplicate files. By default, returns once it finds no duplicate files.
@ -418,100 +379,6 @@ def merge_batch_files(path, batch_time):
if batch_file not in written: if batch_file not in written:
os.remove(batch_file) os.remove(batch_file)
@timed("merge_messages", normalize=lambda _, left, right: len(left) + len(right))
def merge_messages(left, right):
"""Merges two lists of messages into one merged list.
This operation should be a CRDT, ie. all the following hold:
- associative: merge(merge(A, B), C) == merge(A, merge(B, C))
- commutitive: merge(A, B) == merge(B, A)
- reflexive: merge(A, A) == A
This means that no matter what order information from different sources
is incorporated (or if sources are repeated), the results should be the same.
"""
# An optimization - if either size is empty, return the other side without processing.
if not left:
return right
if not right:
return left
# Calculates intersection of time range of both messages, or None if they don't overlap
def overlap(a, b):
range_start = max(a['time'], b['time'])
range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range'])
if range_end < range_start:
return None
return range_start, range_end - range_start
# Returns merged message if two messages are compatible with being the same message,
# or else None.
def merge_message(a, b):
o = overlap(a, b)
if o and all(
a.get(k) == b.get(k)
for k in set(a.keys()) | set(b.keys())
if k not in ("receivers", "time", "time_range")
):
receivers = a["receivers"] | b["receivers"]
# Error checking - make sure no receiver timestamps are being overwritten.
# This would indicate we're merging two messages recieved at different times
# by the same recipient.
for k in receivers.keys():
for old in (a, b):
if k in old and old[k] != receivers[k]:
raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}")
return a | {
"time": o[0],
"time_range": o[1],
"receivers": receivers,
}
return None
# Match things with identical ids first, and collect unmatched into left and right lists
by_id = {}
unmatched = [], []
for messages, u in zip((left, right), unmatched):
for message in messages:
id = (message.get('tags') or {}).get('id')
if id:
by_id.setdefault(id, []).append(message)
else:
u.append(message)
result = []
for id, messages in by_id.items():
if len(messages) == 1:
logging.debug(f"Message with id {id} has no match")
result.append(messages[0])
else:
merged = merge_message(*messages)
if merged is None:
raise ValueError(f"Got two non-matching messages with id {id}: {messages[0]}, {messages[1]}")
logging.debug(f"Merged messages with id {id}")
result.append(merged)
# For time-range messages, pair off each one in left with first match in right,
# and pass through anything with no matches.
left_unmatched, right_unmatched = unmatched
for message in left_unmatched:
for other in right_unmatched:
merged = merge_message(message, other)
if merged:
logging.debug(
"Matched {m[command]} message {a[time]}+{a[time_range]} & {b[time]}+{b[time_range]} -> {m[time]}+{m[time_range]}"
.format(a=message, b=other, m=merged)
)
right_unmatched.remove(other)
result.append(merged)
break
else:
logging.debug("No match found for {m[command]} at {m[time]}+{m[time_range]}".format(m=message))
result.append(message)
for message in right_unmatched:
logging.debug("No match found for {m[command]} at {m[time]}+{m[time_range]}".format(m=message))
result.append(message)
return result
def main(channel, nick, oauth_token_path, base_dir='/mnt', name=None, merge_interval=60): def main(channel, nick, oauth_token_path, base_dir='/mnt', name=None, merge_interval=60):
with open(oauth_token_path) as f: with open(oauth_token_path) as f:

@ -3,7 +3,7 @@ import argh
import logging import logging
import json import json
from .main import merge_messages, format_batch from common.chat import merge_messages, format_batch
def main(*paths, log='INFO'): def main(*paths, log='INFO'):
"""Merge all listed batch files and output result to stdout""" """Merge all listed batch files and output result to stdout"""

@ -1,9 +1,8 @@
import argh import argh
import logging import logging
import json
from .main import merge_messages, format_batch, merge_all from .main import merge_all
def main(path, log='INFO'): def main(path, log='INFO'):
"""Merge all batch files with the same timestamp within given directory""" """Merge all batch files with the same timestamp within given directory"""

@ -150,3 +150,13 @@ def serve_with_graceful_shutdown(server, stop_timeout=20):
logging.info("Shutting down") logging.info("Shutting down")
server.stop(stop_timeout) server.stop(stop_timeout)
logging.info("Gracefully shut down") logging.info("Gracefully shut down")
def listdir(path):
"""as os.listdir but return [] if dir doesn't exist"""
try:
return os.listdir(path)
except OSError as e:
if e.errno != errno.ENOENT:
raise
return []

@ -0,0 +1,152 @@
import json
import logging
import os
from datetime import datetime, timedelta
from common import listdir
from common.stats import timed
from common.segments import hour_paths_for_range
# How long each batch is
BATCH_INTERVAL = 60
def format_batch(messages):
# We need to take some care to have a consistent ordering and format here.
# We use a "canonicalised JSON" format, which is really just whatever the python encoder does,
# with compact separators and sorted keys.
messages = [
(message, json.dumps(message, separators=(',', ':'), sort_keys=True))
for message in messages
]
# We sort by timestamp, then timestamp range, then if all else fails, lexiographically
# on the encoded representation.
messages.sort(key=lambda item: (item[0]['time'], item[0]['time_range'], item[1]))
return "\n".join(line for message, line in messages)
def get_batch_files(path, batch_time):
"""Returns list of batch filepaths for a given batch time as unix timestamp"""
hour = datetime.utcfromtimestamp(batch_time).strftime("%Y-%m-%dT%H")
time = datetime.utcfromtimestamp(batch_time).strftime("%M:%S")
hourdir = os.path.join(path, hour)
return [
os.path.join(hourdir, name)
for name in listdir(hourdir)
if name.startswith(time) and name.endswith(".json")
]
def get_batch_file_range(hours_path, start, end):
"""Returns list of batch filepaths covering at least the time range given, but possibly longer.
May contain multiple results with the same timestamp.
start and end must be datetimes.
"""
# pad start and end to capture neighboring batches, including messages
# with a wide time range, which might actually be in an even earlier batch.
start -= timedelta(seconds=2 * BATCH_INTERVAL)
end += timedelta(seconds=BATCH_INTERVAL)
for hour_path in hour_paths_for_range(hours_path, start, end):
hour = os.path.basename(hour_path)
for name in listdir(hour_path):
min_sec = name.split("-")[0]
timestamp = datetime.strptime("{}:{}".format(hour, min_sec), "%Y-%m-%dT%H:%M:%S")
if start < timestamp < end:
yield os.path.join(hour_path, name)
@timed("merge_messages", normalize=lambda _, left, right: len(left) + len(right))
def merge_messages(left, right):
"""Merges two lists of messages into one merged list.
This operation should be a CRDT, ie. all the following hold:
- associative: merge(merge(A, B), C) == merge(A, merge(B, C))
- commutitive: merge(A, B) == merge(B, A)
- reflexive: merge(A, A) == A
This means that no matter what order information from different sources
is incorporated (or if sources are repeated), the results should be the same.
"""
# An optimization - if either size is empty, return the other side without processing.
if not left:
return right
if not right:
return left
# Calculates intersection of time range of both messages, or None if they don't overlap
def overlap(a, b):
range_start = max(a['time'], b['time'])
range_end = min(a['time'] + a['time_range'], b['time'] + b['time_range'])
if range_end < range_start:
return None
return range_start, range_end - range_start
# Returns merged message if two messages are compatible with being the same message,
# or else None.
def merge_message(a, b):
o = overlap(a, b)
if o and all(
a.get(k) == b.get(k)
for k in set(a.keys()) | set(b.keys())
if k not in ("receivers", "time", "time_range")
):
receivers = a["receivers"] | b["receivers"]
# Error checking - make sure no receiver timestamps are being overwritten.
# This would indicate we're merging two messages recieved at different times
# by the same recipient.
for k in receivers.keys():
for old in (a, b):
if k in old and old[k] != receivers[k]:
raise ValueError(f"Merge would merge two messages with different recipient timestamps: {a}, {b}")
return a | {
"time": o[0],
"time_range": o[1],
"receivers": receivers,
}
return None
# Match things with identical ids first, and collect unmatched into left and right lists
by_id = {}
unmatched = [], []
for messages, u in zip((left, right), unmatched):
for message in messages:
id = (message.get('tags') or {}).get('id')
if id:
by_id.setdefault(id, []).append(message)
else:
u.append(message)
result = []
for id, messages in by_id.items():
if len(messages) == 1:
logging.debug(f"Message with id {id} has no match")
result.append(messages[0])
else:
merged = merge_message(*messages)
if merged is None:
raise ValueError(f"Got two non-matching messages with id {id}: {messages[0]}, {messages[1]}")
logging.debug(f"Merged messages with id {id}")
result.append(merged)
# For time-range messages, pair off each one in left with first match in right,
# and pass through anything with no matches.
left_unmatched, right_unmatched = unmatched
for message in left_unmatched:
for other in right_unmatched:
merged = merge_message(message, other)
if merged:
logging.debug(
"Matched {m[command]} message {a[time]}+{a[time_range]} & {b[time]}+{b[time_range]} -> {m[time]}+{m[time_range]}"
.format(a=message, b=other, m=merged)
)
right_unmatched.remove(other)
result.append(merged)
break
else:
logging.debug("No match found for {m[command]} at {m[time]}+{m[time_range]}".format(m=message))
result.append(message)
for message in right_unmatched:
logging.debug("No match found for {m[command]} at {m[time]}+{m[time_range]}".format(m=message))
result.append(message)
return result

@ -17,6 +17,7 @@ from gevent.pywsgi import WSGIServer
from common import dateutil, get_best_segments, rough_cut_segments, fast_cut_segments, full_cut_segments, PromLogCountsHandler, install_stacksampler, serve_with_graceful_shutdown from common import dateutil, get_best_segments, rough_cut_segments, fast_cut_segments, full_cut_segments, PromLogCountsHandler, install_stacksampler, serve_with_graceful_shutdown
from common.flask_stats import request_stats, after_request from common.flask_stats import request_stats, after_request
from common.segments import feed_input, render_segments_waveform, extract_frame, list_segment_files from common.segments import feed_input, render_segments_waveform, extract_frame, list_segment_files
from common.chat import get_batch_file_range, merge_messages
from . import generate_hls from . import generate_hls
@ -393,6 +394,60 @@ def get_frame(channel, quality):
return Response(extract_frame(segments, timestamp), mimetype='image/png') return Response(extract_frame(segments, timestamp), mimetype='image/png')
@app.route('/<channel>/chat.json')
@request_stats
@has_path_args
def get_chat_messages(channel):
"""
Returns a JSON list of chat messages from the given time range.
The messages are in the same format as used in the chat archiver.
Messages without an exact known time are included if their possible time range
intersects with the requested time range. Note this means that messages in range (A, B)
and range (B, C) may overlap! Thankfully the kinds of messages this can happen for mostly
don't matter - JOINs and PARTs mainly, but sometimes ROOMSTATEs, NOTICEs and CLEARCHATs.
Params:
start, end: Required. The start and end times.
Must be in ISO 8601 format (ie. yyyy-mm-ddTHH:MM:SS) and UTC.
"""
start = dateutil.parse_utc_only(request.args['start'])
end = dateutil.parse_utc_only(request.args['end'])
if end <= start:
return "End must be after start", 400
hours_path = os.path.join(app.static_folder, channel, "chat")
# This process below may fail if a batch is deleted out from underneath us.
# If that happens, we need to start again.
retry = True
while retry:
retry = False
messages = []
for batch_file in get_batch_file_range(hours_path, start, end):
try:
with open(batch_file) as f:
batch = f.read()
except OSError as e:
if e.errno != errno.ENOENT:
raise
# If file doesn't exist, retry the outer loop
retry = True
break
batch = [json.loads(line) for line in batch.strip().split("\n")]
messages = merge_messages(messages, batch)
start = start.timestamp()
end = end.timestamp()
messages = sorted(
[
m for m in messages
# message ends after START, and starts before END
if start <= m['time'] + m['time_range'] and m['time'] < end
], key=lambda m: (m['time'], m['time_range'])
)
return json.dumps(messages)
@app.route('/generate_videos/<channel>/<quality>', methods=['POST']) @app.route('/generate_videos/<channel>/<quality>', methods=['POST'])
@request_stats @request_stats
@has_path_args @has_path_args

Loading…
Cancel
Save