Merge pull request #114 from ekimekim/mike/fixes

Grab-bag of cutter fixes
pull/113/head
Mike Lang 5 years ago committed by GitHub
commit e435abf72e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -7,7 +7,7 @@ import errno
import os import os
import random import random
from .segments import get_best_segments, cut_segments, parse_segment_path, SegmentInfo from .segments import get_best_segments, fast_cut_segments, full_cut_segments, parse_segment_path, SegmentInfo
from .stats import timed, PromLogCountsHandler, install_stacksampler from .stats import timed, PromLogCountsHandler, install_stacksampler

@ -13,6 +13,7 @@ import shutil
import sys import sys
from collections import namedtuple from collections import namedtuple
from contextlib import closing from contextlib import closing
from tempfile import TemporaryFile
import gevent import gevent
from gevent import subprocess from gevent import subprocess
@ -267,7 +268,7 @@ def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None):
""" """
args = [ args = [
'ffmpeg', 'ffmpeg',
'-hide_banner', '-loglevel', 'fatal', # suppress noisy output '-hide_banner', '-loglevel', 'error', # suppress noisy output
'-i', segment.path, '-i', segment.path,
] ]
# output from ffprobe is generally already sorted but let's be paranoid, # output from ffprobe is generally already sorted but let's be paranoid,
@ -291,20 +292,31 @@ def ffmpeg_cut_segment(segment, cut_start=None, cut_end=None):
return subprocess.Popen(args, stdout=subprocess.PIPE) return subprocess.Popen(args, stdout=subprocess.PIPE)
def ffmpeg_cut_stdin(cut_start, cut_end, encode_args): def ffmpeg_cut_stdin(output_file, cut_start, duration, encode_args):
"""Return a Popen object which is ffmpeg cutting from stdin. """Return a Popen object which is ffmpeg cutting from stdin.
This is used when doing a full cut.""" This is used when doing a full cut.
Note the explicit output file object instead of using a pipe,
because most video formats require a seekable file.
"""
args = [ args = [
'ffmpeg', 'ffmpeg',
'-hide_banner', '-loglevel', 'fatal', # suppress noisy output '-hide_banner', '-loglevel', 'error', # suppress noisy output
'-i', '-' '-i', '-',
'-ss', cut_start, '-ss', cut_start,
'-to', cut_end, '-t', duration,
] + list(encode_args) + [ ] + list(encode_args) + [
'-', # output to stdout # We want ffmpeg to write to our tempfile, which is its stdout.
# However, it assumes that '-' means the output is not seekable.
# We trick it into understanding that its stdout is seekable by
# telling it to write to the fd via its /proc/self filename.
'/proc/self/fd/1',
# But of course, that file "already exists", so we need to give it
# permission to "overwrite" it.
'-y',
] ]
args = map(str, args)
logging.info("Running full cut with args: {}".format(" ".join(args))) logging.info("Running full cut with args: {}".format(" ".join(args)))
return subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE) return subprocess.Popen(args, stdin=subprocess.PIPE, stdout=output_file)
def read_chunks(fileobj, chunk_size=16*1024): def read_chunks(fileobj, chunk_size=16*1024):
@ -388,42 +400,40 @@ def fast_cut_segments(segments, start, end):
yield chunk yield chunk
def feed_input(segments, pipe):
"""Write each segment's data into the given pipe in order.
This is used to provide input to ffmpeg in a full cut."""
for segment in segments:
with open(segment.path) as f:
try:
shutil.copyfileobj(f, pipe)
except OSError as e:
# ignore EPIPE, as this just means the end cut meant we didn't need all input
if e.errno != errno.EPIPE:
raise
pipe.close()
def full_cut_segments(segments, start, end, encode_args): def full_cut_segments(segments, start, end, encode_args):
# how far into the first segment to begin # how far into the first segment to begin
cut_start = max(0, (start - segments[0].start).total_seconds()) cut_start = max(0, (start - segments[0].start).total_seconds())
# how much of final segment should be cut off # duration
cut_end = max(0, (segments[-1].end - end).total_seconds()) duration = (end - start).total_seconds()
ffmpeg = None ffmpeg = None
input_feeder = None
try: try:
ffmpeg = ffmpeg_cut_stdin(cut_start, cut_end, encode_args) # Most ffmpeg output formats require a seekable file.
input_feeder = gevent.spawn(feed_input, segments, ffmpeg.stdin) # For the same reason, it's not safe to begin uploading until ffmpeg
# stream the output until it is closed # has finished. We create a temporary file for this.
for chunk in read_chunks(ffmpeg.stdout): tempfile = TemporaryFile()
yield chunk ffmpeg = ffmpeg_cut_stdin(tempfile, cut_start, duration, encode_args)
# stream the input
for segment in segments:
with open(segment.path) as f:
try:
shutil.copyfileobj(f, ffmpeg.stdin)
except OSError as e:
# ignore EPIPE, as this just means the end cut meant we didn't need all input
if e.errno != errno.EPIPE:
raise
ffmpeg.stdin.close()
# check if any errors occurred in input writing, or if ffmpeg exited non-success. # check if any errors occurred in input writing, or if ffmpeg exited non-success.
if ffmpeg.wait() != 0: if ffmpeg.wait() != 0:
raise Exception("Error while streaming cut: ffmpeg exited {}".format(ffmpeg.returncode)) raise Exception("Error while streaming cut: ffmpeg exited {}".format(ffmpeg.returncode))
input_feeder.get() # re-raise any errors from feed_input()
# Now actually yield the resulting file
for chunk in read_chunks(tempfile):
yield chunk
finally: finally:
# if something goes wrong, try to clean up ignoring errors # if something goes wrong, try to clean up ignoring errors
if input_feeder is not None:
input_feeder.kill()
if ffmpeg is not None and ffmpeg.poll() is None: if ffmpeg is not None and ffmpeg.poll() is None:
for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close): for action in (ffmpeg.kill, ffmpeg.stdin.close, ffmpeg.stdout.close):
try: try:

@ -146,7 +146,7 @@ class Cutter(object):
UPDATE events UPDATE events
SET error = %s SET error = %s
WHERE id = %s AND state = 'EDITED' AND error IS NULL WHERE id = %s AND state = 'EDITED' AND error IS NULL
""", id=candidate.id, error=error) """, candidate.id, error)
except Exception: except Exception:
self.logger.exception("Failed to set error for candidate {}, ignoring".format(format_job(candidate))) self.logger.exception("Failed to set error for candidate {}, ignoring".format(format_job(candidate)))
self.refresh_conn() self.refresh_conn()
@ -350,7 +350,7 @@ class Cutter(object):
# from requests.post() are not recoverable. # from requests.post() are not recoverable.
try: try:
video_id = upload_backend.upload_video( video_id, video_link = upload_backend.upload_video(
title=( title=(
"{} - {}".format(self.title_header, job.video_title) "{} - {}".format(self.title_header, job.video_title)
if self.title_header else job.video_title if self.title_header else job.video_title
@ -418,8 +418,7 @@ class Cutter(object):
# Success! Set TRANSCODING or DONE and clear any previous error. # Success! Set TRANSCODING or DONE and clear any previous error.
success_state = 'TRANSCODING' if upload_backend.needs_transcode else 'DONE' success_state = 'TRANSCODING' if upload_backend.needs_transcode else 'DONE'
link = "https://youtu.be/{}".format(video_id) if not set_row(state=success_state, video_id=video_id, video_link=video_link, error=None):
if not set_row(state=success_state, video_id=video_id, video_link=link, error=None):
# This will result in it being stuck in FINALIZING, and an operator will need to go # This will result in it being stuck in FINALIZING, and an operator will need to go
# confirm it was really uploaded. # confirm it was really uploaded.
raise JobConsistencyError( raise JobConsistencyError(
@ -427,7 +426,7 @@ class Cutter(object):
.format(job.id, self.name, success_state) .format(job.id, self.name, success_state)
) )
self.logger.info("Successfully cut and uploaded job {} as {}".format(format_job(job), link)) self.logger.info("Successfully cut and uploaded job {} as {}".format(format_job(job), video_link))
videos_uploaded.labels(video_channel=job.video_channel, videos_uploaded.labels(video_channel=job.video_channel,
video_quality=job.video_quality, video_quality=job.video_quality,
upload_location=job.upload_location).inc() upload_location=job.upload_location).inc()
@ -618,12 +617,12 @@ def main(
backend_type = backend_config.pop('type') backend_type = backend_config.pop('type')
no_transcode_check = backend_config.pop('no_transcode_check', False) no_transcode_check = backend_config.pop('no_transcode_check', False)
cut_type = backend_config.pop('cut_type', 'fast') cut_type = backend_config.pop('cut_type', 'fast')
if type == 'youtube': if backend_type == 'youtube':
backend_type = Youtube backend_type = Youtube
elif type == 'local': elif backend_type == 'local':
backend_type = Local backend_type = Local
else: else:
raise ValueError("Unknown upload backend type: {!r}".format(type)) raise ValueError("Unknown upload backend type: {!r}".format(backend_type))
backend = backend_type(credentials, **backend_config) backend = backend_type(credentials, **backend_config)
if cut_type == 'fast': if cut_type == 'fast':
# mark for fast cut by clearing encoding settings # mark for fast cut by clearing encoding settings

@ -171,11 +171,13 @@ class Local(UploadBackend):
def upload_video(self, title, description, tags, data): def upload_video(self, title, description, tags, data):
video_id = uuid.uuid4() video_id = uuid.uuid4()
# make title safe by removing offending characters, replacing with '-' # make title safe by removing offending characters, replacing with '-'
title = re.sub('[^A-Za-z0-9_]', '-', title) safe_title = re.sub('[^A-Za-z0-9_]', '-', title)
filename = '{}-{}.ts'.format(title, video_id) # TODO with re-encoding, this ext must change # If fast cut enabled, use .ts, otherwise use .mp4
ext = 'ts' if self.encoding_settings is None else 'mp4'
filename = '{}-{}.{}'.format(safe_title, video_id, ext)
filepath = os.path.join(self.path, filename) filepath = os.path.join(self.path, filename)
if self.write_info: if self.write_info:
with open(os.path.join(self.path, '{}-{}.json'.format(title, video_id))) as f: with open(os.path.join(self.path, '{}-{}.json'.format(safe_title, video_id)), 'w') as f:
f.write(json.dumps({ f.write(json.dumps({
'title': title, 'title': title,
'description': description, 'description': description,

Loading…
Cancel
Save