diff --git a/common/common/segments.py b/common/common/segments.py index 26c36b4..0b5e13b 100644 --- a/common/common/segments.py +++ b/common/common/segments.py @@ -295,8 +295,9 @@ def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None): def ffmpeg_cut_stdin(output_file, cut_start, duration, encode_args): """Return a Popen object which is ffmpeg cutting from stdin. This is used when doing a full cut. - Note the explicit output file object instead of using a pipe, - because most video formats require a seekable file. + If output_file is not subprocess.PIPE, + uses explicit output file object instead of using a pipe, + because some video formats require a seekable file. """ args = [ 'ffmpeg', @@ -304,16 +305,20 @@ def ffmpeg_cut_stdin(output_file, cut_start, duration, encode_args): '-i', '-', '-ss', cut_start, '-t', duration, - ] + list(encode_args) + [ - # We want ffmpeg to write to our tempfile, which is its stdout. - # However, it assumes that '-' means the output is not seekable. - # We trick it into understanding that its stdout is seekable by - # telling it to write to the fd via its /proc/self filename. - '/proc/self/fd/1', - # But of course, that file "already exists", so we need to give it - # permission to "overwrite" it. - '-y', - ] + ] + list(encode_args) + if output_file is subprocess.PIPE: + args.append('-') # output to stdout + else: + args += [ + # We want ffmpeg to write to our tempfile, which is its stdout. + # However, it assumes that '-' means the output is not seekable. + # We trick it into understanding that its stdout is seekable by + # telling it to write to the fd via its /proc/self filename. + '/proc/self/fd/1', + # But of course, that file "already exists", so we need to give it + # permission to "overwrite" it. + '-y', + ] args = map(str, args) logging.info("Running full cut with args: {}".format(" ".join(args))) return subprocess.Popen(args, stdin=subprocess.PIPE, stdout=output_file) @@ -401,41 +406,67 @@ def fast_cut_segments(segments, start, end): yield chunk -@timed('cut', type='full', normalize=lambda _, segments, start, end, encode_args: (end - start).total_seconds()) -def full_cut_segments(segments, start, end, encode_args): +def feed_input(segments, pipe): + """Write each segment's data into the given pipe in order. + This is used to provide input to ffmpeg in a full cut.""" + for segment in segments: + with open(segment.path) as f: + try: + shutil.copyfileobj(f, pipe) + except OSError as e: + # ignore EPIPE, as this just means the end cut meant we didn't need all it + if e.errno != errno.EPIPE: + raise + pipe.close() + + +@timed('cut', + type=lambda _, segments, start, end, encode_args, stream=False: "full-streamed" if stream else "full-buffered", + normalize=lambda _, segments, start, end, *a, **k: (end - start).total_seconds()), +) +def full_cut_segments(segments, start, end, encode_args, stream=False): + """If stream=true, assume encode_args gives a streamable format, + and begin returning output immediately instead of waiting for ffmpeg to finish + and buffering to disk.""" # how far into the first segment to begin cut_start = max(0, (start - segments[0].start).total_seconds()) # duration duration = (end - start).total_seconds() ffmpeg = None + input_feeder = None try: - # Most ffmpeg output formats require a seekable file. - # For the same reason, it's not safe to begin uploading until ffmpeg - # has finished. We create a temporary file for this. - tempfile = TemporaryFile() + + if stream: + # When streaming, we can just use a pipe + tempfile = subprocess.PIPE + else: + # Some ffmpeg output formats require a seekable file. + # For the same reason, it's not safe to begin uploading until ffmpeg + # has finished. We create a temporary file for this. + tempfile = TemporaryFile() + ffmpeg = ffmpeg_cut_stdin(tempfile, cut_start, duration, encode_args) + input_feeder = gevent.spawn(feed_input, segments, ffmpeg.stdin) - # stream the input - for segment in segments: - with open(segment.path) as f: - try: - shutil.copyfileobj(f, ffmpeg.stdin) - except OSError as e: - # ignore EPIPE, as this just means the end cut meant we didn't need all input - if e.errno != errno.EPIPE: - raise - ffmpeg.stdin.close() + # When streaming, we can return data as it is available + if stream: + for chunk in read_chunks(ffmpeg.stdout): + yield chunk # check if any errors occurred in input writing, or if ffmpeg exited non-success. if ffmpeg.wait() != 0: raise Exception("Error while streaming cut: ffmpeg exited {}".format(ffmpeg.returncode)) + input_feeder.get() # re-raise any errors from feed_input() - # Now actually yield the resulting file - for chunk in read_chunks(tempfile): - yield chunk + # When not streaming, we can only return the data once ffmpeg has exited + if not stream: + for chunk in read_chunks(tempfile): + yield chunk finally: # if something goes wrong, try to clean up ignoring errors + if input_feeder is not None: + input_feeder.kill() if ffmpeg is not None and ffmpeg.poll() is None: for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close): try: diff --git a/cutter/cutter/main.py b/cutter/cutter/main.py index bbbe04d..65dd889 100644 --- a/cutter/cutter/main.py +++ b/cutter/cutter/main.py @@ -294,8 +294,14 @@ class Cutter(object): self.logger.debug("No encoding settings, using fast cut") cut = fast_cut_segments(job.segments, job.video_start, job.video_end) else: - self.logger.debug("Using encoding settings for cut: {}".format(upload_backend.encoding_settings)) - cut = full_cut_segments(job.segments, job.video_start, job.video_end, upload_backend.encoding_settings) + self.logger.debug("Using encoding settings for {} cut: {}".format( + "streamable" if upload_backend.encoding_streamable else "non-streamable", + upload_backend.encoding_settings, + )) + cut = full_cut_segments( + job.segments, job.video_start, job.video_end, + upload_backend.encoding_settings, stream=upload_backend.encoding_streamable, + ) # This flag tracks whether we've told requests to finalize the upload, # and serves to detect whether errors from the request call are recoverable. diff --git a/cutter/cutter/upload_backends.py b/cutter/cutter/upload_backends.py index 734c4a6..0ffc7be 100644 --- a/cutter/cutter/upload_backends.py +++ b/cutter/cutter/upload_backends.py @@ -67,12 +67,17 @@ class UploadBackend(object): under the 'encoding_settings' attribute. If this is None, instead uses the 'fast cut' strategy where nothing is transcoded. + + In addition, if the output format doesn't need a seekable file, + you should set encoding_streamable = True so we know we can stream the output directly. """ needs_transcode = False - # reasonable default if settings don't otherwise matter - encoding_settings = ['-f', 'mp4'] + # reasonable default if settings don't otherwise matter: + # high-quality mpegts, without wasting too much cpu on encoding + encoding_args = ['-c:v', 'libx264', '-preset', 'ultrafast', '-crf', '0', '-f', 'mpegts'] + encoding_streamable = True def upload_video(self, title, description, tags, data): raise NotImplementedError @@ -92,10 +97,14 @@ class Youtube(UploadBackend): language: The language code to describe all videos as. Default is "en", ie. English. Set to null to not set. + use_yt_recommended_encoding: + Default False. If True, use the ffmpeg settings that Youtube recommends for + fast processing once uploaded. We suggest not bothering, as it doesn't appear + to make much difference. """ needs_transcode = True - encoding_settings = [ + recommended_settings = [ # Youtube's recommended settings: '-codec:v', 'libx264', # Make the video codec x264 '-crf', '21', # Set the video quality, this produces the bitrate range that YT likes @@ -108,7 +117,7 @@ class Youtube(UploadBackend): '-movflags', 'faststart', # put MOOV atom at the front of the file, as requested ] - def __init__(self, credentials, hidden=False, category_id=23, language="en"): + def __init__(self, credentials, hidden=False, category_id=23, language="en", use_yt_recommended_encoding=False): self.logger = logging.getLogger(type(self).__name__) self.client = GoogleAPIClient( credentials['client_id'], @@ -118,6 +127,9 @@ class Youtube(UploadBackend): self.hidden = hidden self.category_id = category_id self.language = language + if use_yt_recommended_encoding: + self.encoding_settings = self.recommended_settings + self.encoding_streamable = False def upload_video(self, title, description, tags, data): json = { diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 78fdce9..b74d7b7 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -237,7 +237,6 @@ def cut(channel, quality): Even if holes are allowed, a 406 may result if the resulting video would be empty. type: One of "fast" or "full". Default to "fast". A fast cut is much faster but minor artifacting may be present near the start and end. - A fast cut is encoded as MPEG-TS, a full as mp4. """ start = dateutil.parse_utc_only(request.args['start']) if 'start' in request.args else None end = dateutil.parse_utc_only(request.args['end']) if 'end' in request.args else None @@ -272,8 +271,9 @@ def cut(channel, quality): if type == 'fast': return Response(fast_cut_segments(segments, start, end), mimetype='video/MP2T') elif type == 'full': - # output as mp4 with no more specific encoding args - return Response(full_cut_segments(segments, start, end, ['-f', 'mp4']), mimetype='video/mp4') + # output as high-quality mpegts, without wasting too much cpu on encoding + encoding_args = ['-c:v', 'libx264', '-preset', 'ultrafast', '-crf', '0', '-f', 'mpegts'] + return Response(full_cut_segments(segments, start, end, encoding_args, stream=True), mimetype='video/MP2T') else: return "Unknown type {!r}. Must be 'fast' or 'full'.".format(type), 400