cutter: Outline of how main cutter run loop works

This commit only lays out the main loop, showing the high-level flow
and defining shared utilities. This is for clarity.

The actual methods that do the work will be implemented seperately.
pull/47/head
Mike Lang 6 years ago committed by Mike Lang
parent 80c1a66aa0
commit ae809c696c

@ -2,6 +2,8 @@
import json
import logging
import signal
import socket
from collections import namedtuple
import gevent.backdoor
import gevent.event
@ -13,21 +15,96 @@ from common.database import DBManager, query
from .youtube import Youtube
# A list of all the DB column names in CutJob
CUT_JOB_PARAMS = [
"category",
"allow_holes",
"uploader_whitelist",
"upload_location",
"video_start",
"video_end",
"video_title",
"video_description",
"video_channel",
"video_quality",
]
CutJob = namedtuple('CutJob', [
"id",
# the list of segments as returned by get_best_segments()
"segments",
# params which map directly from DB columns
] + CUT_JOB_PARAMS)
def format_job(job):
"""Convert candidate row or CutJob to human-readable string"""
return "{job.id}({start}/{duration}s {job.video_title!r})".format(
job=job,
start=job.video_start.isoformat(),
duration=(job.video_end - job.video_start).total_seconds(),
)
class CandidateGone(Exception):
"""Exception indicating a job candidate is no longer available"""
class Cutter(object):
def __init__(self, youtube, conn, stop):
def __init__(self, youtube, conn, stop, name, segments_path):
"""youtube is an authenticated and initialized youtube api client.
Conn is a database connection.
Stop is an Event triggering graceful shutdown when set.
Name is this uploader's unique name.
Segments path is where to look for segments.
"""
self.name = name
self.youtube = youtube
self.conn = conn
self.stop = stop
self.segments_path = segments_path
self.logger = logging.getLogger(type(self).__name__)
def wait(self, interval):
"""Wait for INTERVAL with jitter, unless we're stopping"""
self.stop.wait(common.jitter(interval))
def run(self):
# clean up any potential bad state from unclean shutdown
self.rollback_all_owned()
# main loop - note that the sub-functions are responsible for error handling.
# any unhandled errors will cause the process to restart and clean up as per rollback_all_owned().
while not self.stop.is_set():
pass
job = self.find_candidate()
try:
self.claim_job(job)
except CandidateGone:
continue
self.cut_job(job)
def find_candidate(self):
"""List EDITED events and find one at random which we have all segments for
(or for which allow_holes is true), returning a CutJob.
Polls until one is available.
"""
raise NotImplementedError
def claim_job(self, job):
"""Update event in DB to say we're working on it.
If someone beat us to it, or it's changed, raise CandidateGone."""
# We need to verify all relevant cut params are unchanged, in case they
# were updated between verifying the candidate and now.
raise NotImplementedError
def cut_job(self, job):
"""Perform the actual cut and upload, taking the job through FINALIZING and into
TRANSCODING or DONE.
"""
raise NotImplementedError
def rollback_all_owned(self):
"""Roll back any in-progress jobs that claim to be owned by us,
to recover from an unclean shutdown."""
raise NotImplementedError
class TranscodeChecker(object):
@ -94,12 +171,14 @@ class TranscodeChecker(object):
return result.rowcount
def main(dbconnect, youtube_creds_file, metrics_port=8003, backdoor_port=0):
def main(dbconnect, youtube_creds_file, name=None, base_dir=".", metrics_port=8003, backdoor_port=0):
"""dbconnect should be a postgres connection string, which is either a space-separated
list of key=value pairs, or a URI like:
postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE
youtube_creds_file should be a json file containing keys 'client_id', 'client_secret' and 'refresh_token'.
name defaults to hostname.
"""
common.PromLogCountsHandler.install()
common.install_stacksampler()
@ -108,6 +187,9 @@ def main(dbconnect, youtube_creds_file, metrics_port=8003, backdoor_port=0):
if backdoor_port:
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
if name is None:
name = socket.gethostname()
stop = gevent.event.Event()
gevent.signal(signal.SIGTERM, stop.set) # shut down on sigterm
@ -123,7 +205,7 @@ def main(dbconnect, youtube_creds_file, metrics_port=8003, backdoor_port=0):
client_secret=youtube_creds['client_secret'],
refresh_token=youtube_creds['refresh_token'],
)
cutter = Cutter(youtube, dbmanager.get_conn(), stop)
cutter = Cutter(youtube, dbmanager.get_conn(), stop, name, base_dir)
transcode_checker = TranscodeChecker(youtube, dbmanager.get_conn(), stop)
jobs = [
gevent.spawn(cutter.run),

Loading…
Cancel
Save