diff --git a/common/common/__init__.py b/common/common/__init__.py index 3d8edb8..85529ba 100644 --- a/common/common/__init__.py +++ b/common/common/__init__.py @@ -7,7 +7,7 @@ import errno import os import random -from .segments import get_best_segments, cut_segments, parse_segment_path, SegmentInfo +from .segments import get_best_segments, fast_cut_segments, full_cut_segments, parse_segment_path, SegmentInfo from .stats import timed, PromLogCountsHandler, install_stacksampler diff --git a/common/common/segments.py b/common/common/segments.py index bbf8854..31746ed 100644 --- a/common/common/segments.py +++ b/common/common/segments.py @@ -13,6 +13,7 @@ import shutil import sys from collections import namedtuple from contextlib import closing +from tempfile import TemporaryFile import gevent from gevent import subprocess @@ -267,7 +268,7 @@ def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None): """ args = [ 'ffmpeg', - '-hide_banner', '-loglevel', 'fatal', # suppress noisy output + '-hide_banner', '-loglevel', 'error', # suppress noisy output '-i', segment.path, ] # output from ffprobe is generally already sorted but let's be paranoid, @@ -291,20 +292,31 @@ def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None): return subprocess.Popen(args, stdout=subprocess.PIPE) -def ffmpeg_cut_stdin(cut_start, cut_end, encode_args): +def ffmpeg_cut_stdin(output_file, cut_start, duration, encode_args): """Return a Popen object which is ffmpeg cutting from stdin. - This is used when doing a full cut.""" + This is used when doing a full cut. + Note the explicit output file object instead of using a pipe, + because most video formats require a seekable file. + """ args = [ 'ffmpeg', - '-hide_banner', '-loglevel', 'fatal', # suppress noisy output - '-i', '-' + '-hide_banner', '-loglevel', 'error', # suppress noisy output + '-i', '-', '-ss', cut_start, - '-to', cut_end, + '-t', duration, ] + list(encode_args) + [ - '-', # output to stdout + # We want ffmpeg to write to our tempfile, which is its stdout. + # However, it assumes that '-' means the output is not seekable. + # We trick it into understanding that its stdout is seekable by + # telling it to write to the fd via its /proc/self filename. + '/proc/self/fd/1', + # But of course, that file "already exists", so we need to give it + # permission to "overwrite" it. + '-y', ] + args = map(str, args) logging.info("Running full cut with args: {}".format(" ".join(args))) - return subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE) + return subprocess.Popen(args, stdin=subprocess.PIPE, stdout=output_file) def read_chunks(fileobj, chunk_size=16*1024): @@ -388,42 +400,40 @@ def fast_cut_segments(segments, start, end): yield chunk -def feed_input(segments, pipe): - """Write each segment's data into the given pipe in order. - This is used to provide input to ffmpeg in a full cut.""" - for segment in segments: - with open(segment.path) as f: - try: - shutil.copyfileobj(f, pipe) - except OSError as e: - # ignore EPIPE, as this just means the end cut meant we didn't need all input - if e.errno != errno.EPIPE: - raise - pipe.close() - - def full_cut_segments(segments, start, end, encode_args): # how far into the first segment to begin cut_start = max(0, (start - segments[0].start).total_seconds()) - # how much of final segment should be cut off - cut_end = max(0, (segments[-1].end - end).total_seconds()) + # duration + duration = (end - start).total_seconds() ffmpeg = None - input_feeder = None try: - ffmpeg = ffmpeg_cut_stdin(cut_start, cut_end, encode_args) - input_feeder = gevent.spawn(feed_input, segments, ffmpeg.stdin) - # stream the output until it is closed - for chunk in read_chunks(ffmpeg.stdout): - yield chunk + # Most ffmpeg output formats require a seekable file. + # For the same reason, it's not safe to begin uploading until ffmpeg + # has finished. We create a temporary file for this. + tempfile = TemporaryFile() + ffmpeg = ffmpeg_cut_stdin(tempfile, cut_start, duration, encode_args) + + # stream the input + for segment in segments: + with open(segment.path) as f: + try: + shutil.copyfileobj(f, ffmpeg.stdin) + except OSError as e: + # ignore EPIPE, as this just means the end cut meant we didn't need all input + if e.errno != errno.EPIPE: + raise + ffmpeg.stdin.close() + # check if any errors occurred in input writing, or if ffmpeg exited non-success. if ffmpeg.wait() != 0: raise Exception("Error while streaming cut: ffmpeg exited {}".format(ffmpeg.returncode)) - input_feeder.get() # re-raise any errors from feed_input() + + # Now actually yield the resulting file + for chunk in read_chunks(tempfile): + yield chunk finally: # if something goes wrong, try to clean up ignoring errors - if input_feeder is not None: - input_feeder.kill() if ffmpeg is not None and ffmpeg.poll() is None: for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close): try: diff --git a/cutter/cutter/main.py b/cutter/cutter/main.py index 7a38379..dc8fa76 100644 --- a/cutter/cutter/main.py +++ b/cutter/cutter/main.py @@ -146,7 +146,7 @@ class Cutter(object): UPDATE events SET error = %s WHERE id = %s AND state = 'EDITED' AND error IS NULL - """, id=candidate.id, error=error) + """, candidate.id, error) except Exception: self.logger.exception("Failed to set error for candidate {}, ignoring".format(format_job(candidate))) self.refresh_conn() @@ -350,7 +350,7 @@ class Cutter(object): # from requests.post() are not recoverable. try: - video_id = upload_backend.upload_video( + video_id, video_link = upload_backend.upload_video( title=( "{} - {}".format(self.title_header, job.video_title) if self.title_header else job.video_title @@ -418,8 +418,7 @@ class Cutter(object): # Success! Set TRANSCODING or DONE and clear any previous error. success_state = 'TRANSCODING' if upload_backend.needs_transcode else 'DONE' - link = "https://youtu.be/{}".format(video_id) - if not set_row(state=success_state, video_id=video_id, video_link=link, error=None): + if not set_row(state=success_state, video_id=video_id, video_link=video_link, error=None): # This will result in it being stuck in FINALIZING, and an operator will need to go # confirm it was really uploaded. raise JobConsistencyError( @@ -427,7 +426,7 @@ class Cutter(object): .format(job.id, self.name, success_state) ) - self.logger.info("Successfully cut and uploaded job {} as {}".format(format_job(job), link)) + self.logger.info("Successfully cut and uploaded job {} as {}".format(format_job(job), video_link)) videos_uploaded.labels(video_channel=job.video_channel, video_quality=job.video_quality, upload_location=job.upload_location).inc() @@ -618,12 +617,12 @@ def main( backend_type = backend_config.pop('type') no_transcode_check = backend_config.pop('no_transcode_check', False) cut_type = backend_config.pop('cut_type', 'fast') - if type == 'youtube': + if backend_type == 'youtube': backend_type = Youtube - elif type == 'local': + elif backend_type == 'local': backend_type = Local else: - raise ValueError("Unknown upload backend type: {!r}".format(type)) + raise ValueError("Unknown upload backend type: {!r}".format(backend_type)) backend = backend_type(credentials, **backend_config) if cut_type == 'fast': # mark for fast cut by clearing encoding settings diff --git a/cutter/cutter/upload_backends.py b/cutter/cutter/upload_backends.py index bd3a056..7d8e38d 100644 --- a/cutter/cutter/upload_backends.py +++ b/cutter/cutter/upload_backends.py @@ -171,11 +171,13 @@ class Local(UploadBackend): def upload_video(self, title, description, tags, data): video_id = uuid.uuid4() # make title safe by removing offending characters, replacing with '-' - title = re.sub('[^A-Za-z0-9_]', '-', title) - filename = '{}-{}.ts'.format(title, video_id) # TODO with re-encoding, this ext must change + safe_title = re.sub('[^A-Za-z0-9_]', '-', title) + # If fast cut enabled, use .ts, otherwise use .mp4 + ext = 'ts' if self.encoding_settings is None else 'mp4' + filename = '{}-{}.{}'.format(safe_title, video_id, ext) filepath = os.path.join(self.path, filename) if self.write_info: - with open(os.path.join(self.path, '{}-{}.json'.format(title, video_id))) as f: + with open(os.path.join(self.path, '{}-{}.json'.format(safe_title, video_id)), 'w') as f: f.write(json.dumps({ 'title': title, 'description': description,