From 18068cce2ae9b5b4211a2191f1d0ea3f9d9a8454 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 10 Jun 2019 04:28:31 -0700 Subject: [PATCH] cutter: Outline of how main cutter run loop works This commit only lays out the main loop, showing the high-level flow and defining shared utilities. This is for clarity. The actual methods that do the work will be implemented seperately. --- cutter/cutter/main.py | 90 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 86 insertions(+), 4 deletions(-) diff --git a/cutter/cutter/main.py b/cutter/cutter/main.py index 469da0e..7232c49 100644 --- a/cutter/cutter/main.py +++ b/cutter/cutter/main.py @@ -2,6 +2,8 @@ import json import logging import signal +import socket +from collections import namedtuple import gevent.backdoor import gevent.event @@ -13,21 +15,96 @@ from common.database import DBManager, query from .youtube import Youtube +# A list of all the DB column names in CutJob +CUT_JOB_PARAMS = [ + "category", + "allow_holes", + "uploader_whitelist", + "upload_location", + "video_start", + "video_end", + "video_title", + "video_description", + "video_channel", + "video_quality", +] +CutJob = namedtuple('CutJob', [ + "id", + # the list of segments as returned by get_best_segments() + "segments", + # params which map directly from DB columns +] + CUT_JOB_PARAMS) + + +def format_job(job): + """Convert candidate row or CutJob to human-readable string""" + return "{job.id}({start}/{duration}s {job.video_title!r})".format( + job=job, + start=job.video_start.isoformat(), + duration=(job.video_end - job.video_start).total_seconds(), + ) + + +class CandidateGone(Exception): + """Exception indicating a job candidate is no longer available""" + + class Cutter(object): - def __init__(self, youtube, conn, stop): + def __init__(self, youtube, conn, stop, name, segments_path): """youtube is an authenticated and initialized youtube api client. Conn is a database connection. Stop is an Event triggering graceful shutdown when set. + Name is this uploader's unique name. + Segments path is where to look for segments. """ + self.name = name self.youtube = youtube self.conn = conn self.stop = stop + self.segments_path = segments_path self.logger = logging.getLogger(type(self).__name__) + def wait(self, interval): + """Wait for INTERVAL with jitter, unless we're stopping""" + self.stop.wait(common.jitter(interval)) def run(self): + # clean up any potential bad state from unclean shutdown + self.rollback_all_owned() + # main loop - note that the sub-functions are responsible for error handling. + # any unhandled errors will cause the process to restart and clean up as per rollback_all_owned(). while not self.stop.is_set(): - pass + job = self.find_candidate() + try: + self.claim_job(job) + except CandidateGone: + continue + self.cut_job(job) + + def find_candidate(self): + """List EDITED events and find one at random which we have all segments for + (or for which allow_holes is true), returning a CutJob. + Polls until one is available. + """ + raise NotImplementedError + + def claim_job(self, job): + """Update event in DB to say we're working on it. + If someone beat us to it, or it's changed, raise CandidateGone.""" + # We need to verify all relevant cut params are unchanged, in case they + # were updated between verifying the candidate and now. + raise NotImplementedError + + def cut_job(self, job): + """Perform the actual cut and upload, taking the job through FINALIZING and into + TRANSCODING or DONE. + """ + raise NotImplementedError + + def rollback_all_owned(self): + """Roll back any in-progress jobs that claim to be owned by us, + to recover from an unclean shutdown.""" + raise NotImplementedError class TranscodeChecker(object): @@ -94,12 +171,14 @@ class TranscodeChecker(object): return result.rowcount -def main(dbconnect, youtube_creds_file, metrics_port=8003, backdoor_port=0): +def main(dbconnect, youtube_creds_file, name=None, base_dir=".", metrics_port=8003, backdoor_port=0): """dbconnect should be a postgres connection string, which is either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE youtube_creds_file should be a json file containing keys 'client_id', 'client_secret' and 'refresh_token'. + + name defaults to hostname. """ common.PromLogCountsHandler.install() common.install_stacksampler() @@ -108,6 +187,9 @@ def main(dbconnect, youtube_creds_file, metrics_port=8003, backdoor_port=0): if backdoor_port: gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() + if name is None: + name = socket.gethostname() + stop = gevent.event.Event() gevent.signal(signal.SIGTERM, stop.set) # shut down on sigterm @@ -123,7 +205,7 @@ def main(dbconnect, youtube_creds_file, metrics_port=8003, backdoor_port=0): client_secret=youtube_creds['client_secret'], refresh_token=youtube_creds['refresh_token'], ) - cutter = Cutter(youtube, dbmanager.get_conn(), stop) + cutter = Cutter(youtube, dbmanager.get_conn(), stop, name, base_dir) transcode_checker = TranscodeChecker(youtube, dbmanager.get_conn(), stop) jobs = [ gevent.spawn(cutter.run),