Set up plumbing for multi-range videos and implement no-transition fast cut videos only

This is the simplest case as we can just cut each range like we already do,
then concat the results.

We still allow for the full design in the database and cutter, but error out if transitions
is ever anything but hard cuts or if it's a full cut.

We also update the restreamer to allow accepting ranges, however for usability we still allow
the old "just one start and end" args.

Note this changes the thrimshim API to give and take the new "video_ranges" and "video_transitions" columns.
pull/237/head
Mike Lang 3 years ago committed by Mike Lang
parent e4204d9b9a
commit aab8cf2f0f

@ -137,7 +137,8 @@ columns | type | role |
`allow_holes` | `BOOLEAN NOT NULL DEFAULT FALSE` | edit input | If false, any missing segments encountered while cutting will cause the cut to fail. Setting this to true should be done by an operator to indicate that holes are expected in this range. It is also the operator's responsibility to ensure that all allowed cutters have all segments that they can get, since there is no guarentee that only the cutter with the least missing segments will get the cut job.
`uploader_whitelist` | `TEXT[]` | edit input | List of uploaders which are allowed to cut this entry, or NULL to indicate no restriction. This is useful if you are allowing holes and the amount of missing data differs between nodes (this shouldn't happen - this would mean replication is also failing), or if an operator is investigating a problem with a specific node.
`upload_location` | `TEXT` | edit input | The upload location to upload the cut video to. This is used by the cutter, and must match one of the cutter's configured upload locations. If it does not, the cutter will not claim the event.
`video_start`, `video_end` | `TIMESTAMP` | edit input | Start and end time of the video to cut. If already set, used as the default trim times when editing.
`video_ranges` | `{start TIMESTAMP, end TIMESTAMP}[]` | edit input | A non-zero number of start and end times, describing the ranges of video to cut. They will be cut back-to-back in the given order, with the transitions between as per `video_transitions`. If already set, used as the default range settings when editing.
`video_transitions` | `{type TEXT, duration INTERVAL}[]` | edit input | Defines how to transition between each range defined in `video_ranges`, and must be exactly the length of `video_ranges` minus 1. Each index in `video_transitions` defines the transition between the range with the same index in `video_ranges` and the next one. Transitions either specify a transition type as understood by `ffmpeg`'s `xfade` filter and a duration (amount of overlap), or can be NULL to indicate a hard cut.
`video_title` | `TEXT` | edit input | The title of the video. If already set, used as the default title when editing instead of `description`.
`video_description` | `TEXT` | edit input | The description field of the video. If already set, used as the default description when editing instead of `description`.
`video_tags` | `TEXT[]` | edit input | Custom tags to annotate the video with. If already set, used as the default when editing instead of `tags`.

@ -8,11 +8,33 @@ as a whole does not to avoid needing to install them for components that don't n
from contextlib import contextmanager
import psycopg2
import psycopg2.sql
import psycopg2.extensions
import psycopg2.extras
from psycogreen.gevent import patch_psycopg
COMPOSITE_TYPES = [
"video_range",
"video_transition",
]
COLUMN_CASTS = {
"video_ranges": "video_range[]",
"video_transitions": "video_transition[]",
}
def get_column_placeholder(column):
"""Get a placeholder (like "%(COLUMN)s") to use in constructed SQL queries
for a given column in the events table. This function is needed because
some columns have types that require explicit casts to be included."""
placeholder = psycopg2.sql.Placeholder(column)
if column in COLUMN_CASTS:
placeholder = psycopg2.sql.SQL("{}::{}").format(
placeholder, psycopg2.sql.SQL(COLUMN_CASTS[column])
)
return placeholder
class DBManager(object):
"""Patches psycopg2 before any connections are created. Stores connect info
for easy creation of new connections, and sets some defaults before
@ -44,6 +66,8 @@ class DBManager(object):
# searches or targetted single-row updates.
conn.isolation_level = psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
conn.autocommit = True
for composite in COMPOSITE_TYPES:
psycopg2.extras.register_composite(composite, conn)
return conn

@ -360,12 +360,20 @@ def read_chunks(fileobj, chunk_size=16*1024):
yield chunk
@timed('cut', cut_type='rough', normalize=lambda _, segments, start, end: (end - start).total_seconds())
def rough_cut_segments(segments, start, end):
"""Yields chunks of a MPEGTS video file covering at least the timestamp range,
likely with a few extra seconds on either side.
def range_total(ranges):
return sum([
end - start for start, end in ranges
], datetime.timedelta()).total_seconds()
@timed('cut', cut_type='rough', normalize=lambda ret, sr, ranges: range_total(ranges))
def rough_cut_segments(segment_ranges, ranges):
"""Yields chunks of a MPEGTS video file covering at least the timestamp ranges,
likely with a few extra seconds on either side of each range. Ranges are cut between
with no transitions.
This method works by simply concatenating all the segments, without any re-encoding.
"""
for segments in segment_ranges:
for segment in segments:
if segment is None:
continue
@ -374,14 +382,27 @@ def rough_cut_segments(segments, start, end):
yield chunk
@timed('cut', cut_type='fast', normalize=lambda _, segments, start, end: (end - start).total_seconds())
def fast_cut_segments(segments, start, end):
"""Yields chunks of a MPEGTS video file covering the exact timestamp range.
segments should be a list of segments as returned by get_best_segments().
This method works by only cutting the first and last segments, and concatenating the rest.
@timed('cut', cut_type='fast', normalize=lambda ret, sr, ranges: range_total(ranges))
def fast_cut_segments(segment_ranges, ranges):
"""Yields chunks of a MPEGTS video file covering the exact timestamp ranges.
segments should be a list of segment lists as returned by get_best_segments() for each range.
This method works by only cutting the first and last segments of each range,
and concatenating everything together. Ranges are cut between with no transitions.
This only works if the same codec settings etc are used across all segments.
This should almost always be true but may cause weird results if not.
"""
if len(segment_ranges) != len(ranges):
raise ValueError("You need to provide one segment list for each range")
for segments, (start, end) in zip(segment_ranges, ranges):
# We could potentially optimize here by cutting all firsts/lasts in parallel
# instead of doing them in order, but that's probably not that helpful and would
# greatly complicate things.
yield from fast_cut_range(segments, start, end)
@timed('cut_range', cut_type='fast', normalize=lambda _, segments, start, end: (end - start).total_seconds())
def fast_cut_range(segments, start, end):
"""Does a fast cut for an individual range of segments"""
# how far into the first segment to begin (if no hole at start)
cut_start = None

@ -15,7 +15,7 @@ import requests
from psycopg2 import sql
import common
from common.database import DBManager, query
from common.database import DBManager, query, get_column_placeholder
from common.segments import get_best_segments, fast_cut_segments, full_cut_segments, ContainsHoles
from common.stats import timed
@ -58,8 +58,8 @@ CUT_JOB_PARAMS = [
"allow_holes",
"uploader_whitelist",
"upload_location",
"video_start",
"video_end",
"video_ranges",
"video_transitions",
"video_title",
"video_description",
"video_tags",
@ -68,18 +68,35 @@ CUT_JOB_PARAMS = [
]
CutJob = namedtuple('CutJob', [
"id",
# the list of segments as returned by get_best_segments()
"segments",
# for each range, the list of segments as returned by get_best_segments()
"segment_ranges",
# params which map directly from DB columns
] + CUT_JOB_PARAMS)
def get_duration(job):
"""Get total video duration of a job, in seconds"""
# Due to ranges and transitions, this is actually non-trivial to calculate.
# Each range overlaps the previous by duration, so we add all the ranges
# then subtract all the durations.
without_transitions = sum([
range.end - range.start
for range in job.video_ranges
], datetime.timedelta())
overlap = sum([
transition.duration
for transition in job.video_transitions
if transition is not None
], datetime.timedelta())
return (without_transitions - overlap).total_seconds()
def format_job(job):
"""Convert candidate row or CutJob to human-readable string"""
return "{job.id}({start}/{duration}s {job.video_title!r})".format(
job=job,
start=job.video_start.isoformat(),
duration=(job.video_end - job.video_start).total_seconds(),
start=job.video_ranges[0].start.isoformat(),
duration=get_duration(job),
)
@ -180,7 +197,7 @@ class Cutter(object):
self.logger.info("Set error for candidate {}".format(format_job(candidate)))
try:
segments = self.check_candidate(candidate)
segment_ranges = self.check_candidate(candidate)
except ContainsHoles:
self.logger.info("Ignoring candidate {} due to holes".format(format_job(candidate)))
set_error(
@ -188,7 +205,8 @@ class Cutter(object):
"This may just be because it's too recent and the video hasn't been downloaded yet. "
"However, it might also mean that there is a 'hole' of missing video, perhaps "
"because the stream went down or due to downloader issues. If you know why this "
"is happening and want to cut the video anyway, re-edit with the 'Allow Holes' option set."
"is happening and want to cut the video anyway, re-edit with the 'Allow Holes' option set. "
"However, even with 'Allow Holes', this will still fail if any range of video is missing entirely."
.format(self.name))
continue # bad candidate, let someone else take it or just try again later
except Exception as e:
@ -202,11 +220,7 @@ class Cutter(object):
self.wait(self.ERROR_RETRY_INTERVAL)
continue
if all(segment is None for segment in segments):
self.logger.info("Ignoring candidate {} as we have no segments".format(format_job(candidate)))
continue
return CutJob(segments=segments, **candidate._asdict())
return CutJob(segment_ranges=segment_ranges, **candidate._asdict())
# No candidates
no_candidates.inc()
@ -229,18 +243,31 @@ class Cutter(object):
result = query(self.conn, built_query, name=self.name, upload_locations=list(self.upload_locations.keys()))
return result.fetchall()
# No need to instrument this function, just use get_best_segments() stats
@timed(
video_channel = lambda ret, self, job: job.video_channel,
video_quality = lambda ret, self, job: job.video_quality,
range_count = lambda ret, self, job: len(job.video_ranges),
normalize = lambda ret, self, job: get_duration(job),
)
def check_candidate(self, candidate):
return get_best_segments(
os.path.join(self.segments_path, candidate.video_channel, candidate.video_quality),
candidate.video_start,
candidate.video_end,
# Gather segment lists. Abort early if we find a range for which we have no segments at all.
hours_path = os.path.join(self.segments_path, candidate.video_channel, candidate.video_quality)
segment_ranges = []
for range in candidate.video_ranges:
segments = get_best_segments(
hours_path,
range.start,
range.end,
allow_holes=candidate.allow_holes,
)
if segments == [None]:
raise ContainsHoles
segment_ranges.append(segments)
return segment_ranges
@timed(
video_channel = lambda self, job: job.video_channel,
video_quality = lambda self, job: job.video_quality,
video_channel = lambda ret, self, job: job.video_channel,
video_quality = lambda ret, self, job: job.video_quality,
)
def claim_job(self, job):
"""Update event in DB to say we're working on it.
@ -257,7 +284,7 @@ class Cutter(object):
# A built AND over all CUT_JOB_PARAMS to check key = %(key)s.
# Note the use of IS NOT DISTINCT FROM because key = NULL is false if key is NULL.
sql.SQL(' AND ').join(
sql.SQL("{} IS NOT DISTINCT FROM {}").format(sql.Identifier(key), sql.Placeholder(key))
sql.SQL("{} IS NOT DISTINCT FROM {}").format(sql.Identifier(key), get_column_placeholder(key))
for key in CUT_JOB_PARAMS
)
)
@ -300,14 +327,19 @@ class Cutter(object):
if upload_backend.encoding_settings is None:
self.logger.debug("No encoding settings, using fast cut")
cut = fast_cut_segments(job.segments, job.video_start, job.video_end)
if any(transition is not None for transition in job.video_transitions):
raise ValueError("Fast cuts do not support complex transitions")
cut = fast_cut_segments(job.segment_ranges, job.video_ranges)
else:
self.logger.debug("Using encoding settings for {} cut: {}".format(
"streamable" if upload_backend.encoding_streamable else "non-streamable",
upload_backend.encoding_settings,
))
if len(job.video_ranges) > 1:
raise ValueError("Full cuts do not support multiple ranges")
range = job.video_ranges[0]
cut = full_cut_segments(
job.segments, job.video_start, job.video_end,
job.segment_ranges[0], range.start, range.end,
upload_backend.encoding_settings, stream=upload_backend.encoding_streamable,
)
@ -333,7 +365,7 @@ class Cutter(object):
WHERE id = %(id)s AND uploader = %(name)s
""").format(sql.SQL(", ").join(
sql.SQL("{} = {}").format(
sql.Identifier(key), sql.Placeholder(key),
sql.Identifier(key), get_column_placeholder(key),
) for key in kwargs
))
result = query(self.conn, built_query, id=job.id, name=self.name, **kwargs)
@ -583,7 +615,7 @@ def main(
same place just with different settings, and you only want one of them
to actually do the check.
cut_type:
One of 'fast' or 'full'. Default 'fast'. This indicates whether to use
One of 'fast' or 'full'. Default 'full'. This indicates whether to use
fast_cut_segments() or full_cut_segments() for this location.
along with any additional config options defined for that backend type.

@ -72,7 +72,7 @@ class PlaylistManager(object):
# the next time.
conn = self.dbmanager.get_conn()
videos = query(conn, """
SELECT video_id, tags, COALESCE(video_start, event_start) AS start_time
SELECT video_id, tags, COALESCE((video_ranges[1]).start, event_start) AS start_time
FROM events
WHERE state = 'DONE' AND upload_location = ANY (%s)
""", self.upload_locations)

@ -46,6 +46,15 @@ CREATE TYPE event_state as ENUM (
'DONE'
);
CREATE TYPE video_range as (
start TIMESTAMP,
"end" TIMESTAMP
);
CREATE TYPE video_transition as (
type TEXT,
duration INTERVAL
);
CREATE TABLE events (
id UUID PRIMARY KEY,
@ -62,8 +71,12 @@ CREATE TABLE events (
allow_holes BOOLEAN NOT NULL DEFAULT FALSE,
uploader_whitelist TEXT[],
upload_location TEXT CHECK (state = 'UNEDITED' OR upload_location IS NOT NULL),
video_start TIMESTAMP CHECK (state IN ('UNEDITED', 'DONE') OR video_start IS NOT NULL),
video_end TIMESTAMP CHECK (state IN ('UNEDITED', 'DONE') OR video_end IS NOT NULL),
video_ranges video_range[] CHECK (state IN ('UNEDITED', 'DONE') OR video_ranges IS NOT NULL),
video_transitions video_transition[] CHECK (state IN ('UNEDITED', 'DONE') OR video_transitions IS NOT NULL),
CHECK (
(video_ranges IS NULL AND video_transitions IS NULL)
OR CARDINALITY(video_ranges) = CARDINALITY(video_transitions) + 1
),
video_title TEXT CHECK (state IN ('UNEDITED', 'DONE') OR video_title IS NOT NULL),
video_description TEXT CHECK (state IN ('UNEDITED', 'DONE') OR video_description IS NOT NULL),
video_tags TEXT[] CHECK (state IN ('UNEDITED', 'DONE') OR video_tags IS NOT NULL),

@ -238,12 +238,19 @@ def generate_media_playlist(channel, quality):
def cut(channel, quality):
"""Return a MPEGTS video file covering the exact timestamp range.
Params:
start, end: Required. The start and end times, down to the millisecond.
start, end: The start and end times, down to the millisecond.
Must be in ISO 8601 format (ie. yyyy-mm-ddTHH:MM:SS) and UTC.
If not given (and ranges not given), will use the earliest/latest data available.
range: A pair "START,END" which are formatted as per start and end args.
Overrides "start" and "end" options.
This option may be given multiple times.
The final video will consist of all the ranges cut back to back,
in the order given, with hard cuts between each range.
allow_holes: Optional, default false. If false, errors out with a 406 Not Acceptable
if any holes are detected, rather than producing a video with missing parts.
Set to true by passing "true" (case insensitive).
Even if holes are allowed, a 406 may result if the resulting video would be empty.
Even if holes are allowed, a 406 may result if the resulting video (or any individual
range) would be empty.
type: One of:
"rough": A direct concat, like a fast cut but without any ffmpeg.
It may extend beyond the requested start and end times by a few seconds.
@ -251,9 +258,14 @@ def cut(channel, quality):
the other segments.
"mpegts": A full cut to a streamable mpegts format. This consumes signifigant server
resources, so please use sparingly.
"mp4": As mpegts, but encodes as MP4. This format must be buffered to disk before
sending so it's a bit slower.
"""
if 'range' in request.args:
parts = [part.split(',') for part in request.args.getlist('range')]
ranges = [
(dateutil.parse_utc_only(start), dateutil.parse_utc_only(end))
for start, end in parts
]
else:
start = dateutil.parse_utc_only(request.args['start']) if 'start' in request.args else None
end = dateutil.parse_utc_only(request.args['end']) if 'end' in request.args else None
if start is None or end is None:
@ -263,9 +275,11 @@ def cut(channel, quality):
start = first
if end is None:
end = last
ranges = [(start, end)]
for start, end in ranges:
if end <= start:
return "End must be after start", 400
return "Ends must be after starts", 400
allow_holes = request.args.get('allow_holes', 'false').lower()
if allow_holes not in ["true", "false"]:
@ -276,25 +290,30 @@ def cut(channel, quality):
if not os.path.isdir(hours_path):
abort(404)
segment_ranges = []
for start, end in ranges:
segments = get_best_segments(hours_path, start, end)
if not allow_holes and None in segments:
return "Requested time range contains holes or is incomplete.", 406
if not any(segment is not None for segment in segments):
return "We have no content available within the requested time range.", 406
segment_ranges.append(segments)
type = request.args.get('type', 'fast')
if type == 'rough':
return Response(rough_cut_segments(segments, start, end), mimetype='video/MP2T')
return Response(rough_cut_segments(segment_ranges, ranges), mimetype='video/MP2T')
elif type == 'fast':
return Response(fast_cut_segments(segments, start, end), mimetype='video/MP2T')
return Response(fast_cut_segments(segment_ranges, ranges), mimetype='video/MP2T')
elif type in ('mpegts', 'mp4'):
if type == 'mp4':
return "mp4 type has been disabled due to the load it causes", 400
# encode as high-quality, without wasting too much cpu on encoding
stream, muxer, mimetype = (True, 'mpegts', 'video/MP2T') if type == 'mpegts' else (False, 'mp4', 'video/mp4')
encoding_args = ['-c:v', 'libx264', '-preset', 'ultrafast', '-crf', '0', '-f', muxer]
return Response(full_cut_segments(segments, start, end, encoding_args, stream=stream), mimetype=mimetype)
if len(ranges) > 1:
return "full cut does not support multiple ranges at this time", 400
start, end = ranges[0]
return Response(full_cut_segments(segment_ranges[0], start, end, encoding_args, stream=stream), mimetype=mimetype)
else:
return "Unknown type {!r}".format(type), 400

@ -16,7 +16,7 @@ from requests import HTTPError
import common
import common.dateutil
from common.database import DBManager, query
from common.database import DBManager, query, get_column_placeholder
from .sheets import Sheets
@ -276,7 +276,7 @@ class SheetSync(object):
ON CONFLICT DO NOTHING
""").format(
sql.SQL(", ").join(sql.Identifier(col) for col in insert_cols),
sql.SQL(", ").join(sql.Placeholder(col) for col in insert_cols),
sql.SQL(", ").join(get_column_placeholder(col) for col in insert_cols),
)
query(self.conn, built_query, sheet_name=worksheet, **row)
rows_found.labels(worksheet).inc()
@ -302,7 +302,7 @@ class SheetSync(object):
WHERE id = %(id)s
""").format(sql.SQL(", ").join(
sql.SQL("{} = {}").format(
sql.Identifier(col), sql.Placeholder(col)
sql.Identifier(col), get_column_placeholder(col)
) for col in changed
))
query(self.conn, built_query, **row)

@ -65,7 +65,8 @@ function pageSetup(isEditor) {
document.getElementById("wubloaderAdvancedInputTable").style.display = "block";
}
loadPlaylist(isEditor, data.video_start, data.video_end, data.video_quality);
const [ video_start, video_end ] = data.video_ranges === null ? [] : data.video_ranges[0];
loadPlaylist(isEditor, video_start, video_end, data.video_quality);
});
} else {
if (isEditor) {
@ -295,8 +296,8 @@ function thrimbletrimmerSubmit(state, override_changes = false) {
}
const wubData = {
video_start: start,
video_end: end,
video_ranges: [[start, end]],
video_transitions: [],
video_title: document.getElementById("VideoTitle").value,
video_description: document.getElementById("VideoDescription").value,
video_tags: tags_string_to_list(document.getElementById("VideoTags").value),
@ -328,11 +329,11 @@ function thrimbletrimmerSubmit(state, override_changes = false) {
}
console.log("Submitting", wubData);
if (!wubData.video_start) {
if (!wubData.video_ranges[0][0]) {
alert("No start time set");
return;
}
if (!wubData.video_end) {
if (!wubData.video_ranges[0][1]) {
alert("No end time set");
return;
}

@ -203,7 +203,7 @@ def update_row(ident, editor=None):
state_columns = ['state', 'uploader', 'error', 'video_link']
# These have to be set before a video can be set as 'EDITED'
non_null_columns = [
'upload_location', 'video_start', 'video_end',
'upload_location', 'video_ranges', 'video_transitions',
'video_channel', 'video_quality', 'video_title',
'video_description', 'video_tags',
]
@ -239,9 +239,24 @@ def update_row(ident, editor=None):
return 'Title may not contain a {} character'.format(char), 400
if char in new_row['video_description']:
return 'Description may not contain a {} character'.format(char), 400
# Validate start time is less than end time
if new_row['video_start'] > new_row['video_end']:
return 'Video Start must be less than Video End.', 400
# Validate and convert video ranges and transitions.
num_ranges = len(new_row['video_ranges'])
if num_ranges == 0:
return 'Ranges must contain at least one range', 400
if len(new_row['video_transitions']) != num_ranges - 1:
return 'There must be exactly {} transitions for {} ranges'.format(
num_ranges - 1, num_ranges,
)
for start, end in new_row['video_ranges']:
if start > end:
return 'Range start must be less than end', 400
# We need these to be tuples not lists for psycopg2 to do the right thing,
# but since they come in as JSON they are currently lists.
new_row['video_ranges'] = [tuple(range) for range in new_row['video_ranges']]
new_row['video_transitions'] = [
None if transition is None else tuple(transition)
for transition in new_row['video_transitions']
]
conn = app.db_manager.get_conn()
# Check a row with id = ident is in the database
@ -281,7 +296,7 @@ def update_row(ident, editor=None):
if new_row['state'] == 'EDITED':
missing = []
for column in non_null_columns:
if not new_row[column]:
if new_row[column] is None:
missing.append(column)
if missing:
return 'Fields {} must be non-null for video to be cut'.format(', '.join(missing)), 400
@ -304,7 +319,7 @@ def update_row(ident, editor=None):
AND state IN ('UNEDITED', 'EDITED', 'CLAIMED')"""
).format(sql.SQL(", ").join(
sql.SQL("{} = {}").format(
sql.Identifier(column), sql.Placeholder(column),
sql.Identifier(column), database.get_column_placeholder(column),
) for column in new_row.keys() if column not in sheet_columns
))
result = database.query(conn, build_query, id=ident, **new_row)

Loading…
Cancel
Save