diff --git a/wubloader/heartbeat.py b/wubloader/heartbeat.py index 8a58843..10129b1 100644 --- a/wubloader/heartbeat.py +++ b/wubloader/heartbeat.py @@ -1,4 +1,8 @@ +import logging +import time + +import gevent class Heartbeat(object): @@ -8,9 +12,14 @@ class Heartbeat(object): instance, last update time (in epoch) This instance will be added if it doesn't already exist. + Also keeps track of what other bots are alive in self.alive + This class is a context manager and will run until exit. """ + # How often to refresh our heartbeat HEARTBEAT_INTERVAL = 1 + # How old other bots heartbeat needs to be to consider them dead + HEARTBEAT_THRESHOLD = 10 def __init__(self, sheet, name, group): self.sheet = sheet @@ -18,7 +27,8 @@ class Heartbeat(object): self.stopped = gevent.event.Event() def __enter__(self): - self.worker = group.spawn(self._run) + self.alive = self._get_alive() # do one now to prevent a race where it's read before it's written + self.worker = self.group.spawn(self._run) return self def __exit__(self, *exc_info): @@ -31,5 +41,24 @@ class Heartbeat(object): row = self.sheet.append(id=self.name, heartbeat=time.time()) while not self.stopped.wait(self.HEARTBEAT_INTERVAL): row.update(heartbeat=time.time()) + self.alive = self._get_alive() + # clear the heartbeat to indicate we're stopping row.update(heartbeat="") + + def _get_alive(self): + alive = set() + for row in self.sheet: + if not row.id: + continue + try: + heartbeat = float(row.heartbeat) + except ValueError: + logging.warning("Invalid heartbeat value for row {}: {!r}".format(row, row.heartbeat)) + continue + age = time.time() - heartbeat + if age > self.HEARTBEAT_THRESHOLD: + logging.debug("Considering {} dead: heartbeat too old at {} sec".format(row.id, age)) + continue + alive.add(row.id) + return alive diff --git a/wubloader/job.py b/wubloader/job.py new file mode 100644 index 0000000..116d8c2 --- /dev/null +++ b/wubloader/job.py @@ -0,0 +1,109 @@ + +import logging + +from . import states + + +class Job(object): + """A job wraps a row and represents a video cutting job to do.""" + + # How often to check if someone else has claimed a row out from underneath us. + OWNERSHIP_CHECK_INTERVAL = 1 + + def __init__(self, wubloader, is_chunk, sheet, row): + self.wubloader = wubloader + if is_chunk: + self.job_type = 'chunk' + elif row.state in states.FLOWS['draft']: + self.job_type = 'draft' + else: + assert row.state in states.FLOWS['publish'] + self.job_type = 'publish' + self.sheet = sheet + self.row = row + + @property + def priority(self): + """User-set priority is most important, followed by type, then earliest first.""" + type_priority = ['chunk', 'draft', 'publish'] # low to high priority + return ( + getattr(self.row, 'priority', 0), # chunks don't have priority, default to 0 + type_priority.index(self.job_type), + -self.sheet.id, # sheet index, low number is high priority + -self.row.index, # row index, low number is high priority + ) + + @property + def uploader(self): + """A processed uploader check that ignores dead bots""" + return self.row.uploader if self.row.uploader in self.wubloader.heartbeat.alive else "" + + @property + def excluded(self): + """Bots that may not claim this row. NOTE: All lowercase.""" + if not self.row.excluded.strip(): + return [] + return [name.strip().lower() for name in self.row.excluded.split(',')] + + @property + def start_time(self): + try: + return parse_bustime(self.wubloader.bustime_base, self.row.start_time) + except ValueError as e: + raise ValueError("Start time: {}".format(e)) + + @property + def end_time(self): + try: + return parse_bustime(self.wubloader.bustime_base, self.row.end_time) + except ValueError as e: + raise ValueError("End time: {}".format(e)) + + @property + def duration(self): + return self.end_time - self.start_time + + def cancel(self): + """Cancel job that is currently being processed, setting it back to its starting state.""" + if not self.worker.ready(): + # By setting uploader to blank, the watchdog will stop the in-progress job. + self.row.update(state=states.FLOWS[self.job_type][0], uploader="") + + def process(self): + """Call this to perform the job.""" + # We do the actual work in a seperate greenlet so we can easily cancel it. + self.worker = self.wubloader.group.spawn(self._process) + # While that does the real work, we poll the uploader field to check no-one else has stolen it. + while not self.worker.ready(): + row = self.row.refresh() + if row is None or row.uploader != self.row.uploader: + # Our row's been stolen, cancelled, or just plain lost. + # Abort with no rollback - let them have it. + logging.warning("Job {} aborted: Row {} is {}".format(self, self.row, + "gone" if row is None + else "cancelled" if row.uploader == "" + else "claimed by {}".format(row.uploader) + )) + self.worker.kill(block=True) + return + # Sleep until either worker is done or interval has passed + self.worker.join(self.OWNERSHIP_CHECK_INTERVAL) + + def _process(self): + """Does the actual cutting work. You should call process() instead.""" + # TODO + + +def parse_bustime(base, value): + parts = value.strip().split(':') + if len(parts) == 2: + hours = int(parts[0]) + mins = float(parts[1]) + secs = 0 + elif len(parts) == 3: + hours = int(parts[0]) + mins = int(parts[1]) + secs = float(parts[2]) + else: + raise ValueError("Bad format: Must be HH:MM or HH:MM:SS") + return base + hours * 3600 + mins * 60 + secs diff --git a/wubloader/main.py b/wubloader/main.py index 2e12b16..7363884 100644 --- a/wubloader/main.py +++ b/wubloader/main.py @@ -1,4 +1,5 @@ +import json import logging import signal diff --git a/wubloader/sheets.py b/wubloader/sheets.py index 9ae1ec5..bef1f6b 100644 --- a/wubloader/sheets.py +++ b/wubloader/sheets.py @@ -1,9 +1,14 @@ - - """The classes in the file wrap the gspread API to present a simpler interface, which transparently handles re-connecting, sheets schemas and tracking rows by id. """ + +import gevent.lock + +from oauth2client.client import SignedJwtAssertionCredentials +import gspread + + # schemas maps sheet name to schema. # each schema contains a map from column names to column indexes (1-based) SCHEMAS = { @@ -12,10 +17,32 @@ SCHEMAS = { "heartbeat": 2, }, "chunks": { - # TODO + "start": 1, + "end": 2, + "description": 4, + "link": 5, + "state": 6, + "uploader": 7, + "notes": 8, + "id": 9, + "cut_time": 10, + "upload_time": 11, + "duration": 12, }, "main": { - # TODO + "start": 1, + "end": 2, + "description": 4, + "link": 7, + "state": 8, + "location": 9, + "uploader": 10, + "notes": 12, + "id": 14, + "draft_time": 15, + "cut_time": 16, + "upload_time": 17, + "duration": 18, }, } @@ -90,7 +117,7 @@ class Sheet(object): def __iter__(self): with self.manager.lock: self.manager.refresh() - return [Row(self, schema, i+1, r) for i, r in enumerate(self.worksheet.get_all_values())] + return [Row(self, self.schema, i+1, r) for i, r in enumerate(self.worksheet.get_all_values())] def find_row(self, id): for row in self: @@ -101,7 +128,7 @@ class Sheet(object): def by_index(self, index): with self.manager.lock: self.manager.refresh() - return Row(self, schema, index, self.worksheet.row_values(index)) + return Row(self, self.schema, index, self.worksheet.row_values(index)) def append(self, id, **values): """Create new row with given initial values, and return it.""" @@ -152,6 +179,7 @@ class Row(object): # Checking our position again afterwards. If it's changed, we probably mis-wrote. if self.id: before = self.refresh() + # TODO handle before = None else: before = self for name, value in values.items(): diff --git a/wubloader/states.py b/wubloader/states.py new file mode 100644 index 0000000..1f23f94 --- /dev/null +++ b/wubloader/states.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- + +QUEUED = "[✓] Queued" +PROCESSING_VIDEO = "[^] Processing Video" +AWAITING_EDITS = "[✓] Awaiting Edits" +EDITS_QUEUED = "[✓] Edits Queued" +PROCESSING_EDITS = "[^] Processing Edits" +UPLOADING = "[^] Uploading" +PUBLISHED = "[✓] Published" +ERROR = "[❌] Error" + + +# Map {name: (ready, *in progress, complete)} state flows. +# Note that due to bot deaths, etc, it can be pretty easy for something to be in an in-progress state +# but not actually be in progress. We treat in progress and ready states as basically equivalent and only +# existing for human feedback. Our actual in progress indicator comes from the uploader field, +# which can be ignored if the uploader in question is dead. +FLOWS = { + 'draft': (QUEUED, PROCESSING_VIDEO, AWAITING_EDITS), + 'publish': (EDITS_QUEUED, PROCESSING_EDITS, UPLOADING, PUBLISHED), + 'chunk': (QUEUED, PROCESSING_VIDEO, UPLOADING, PUBLISHED), +} +CHUNK_FLOWS = ('chunk',) +MAIN_FLOWS = ('draft', 'publish') + + +# Whether this is a state we want to act on, defined as any non-complete state. +def is_actionable(sheet_type, state): + flows = CHUNK_FLOWS if sheet_type == 'chunks' else MAIN_FLOWS + for name in flows: + flow = FLOWS[name] + if state in flow[:-1]: + return True + return False + + +# General map from in-progress states to the state to rollback to. +# For non-in-progress states, we just map them to themselves. +def rollback(sheet_type, state): + flows = CHUNK_FLOWS if sheet_type == 'chunks' else MAIN_FLOWS + for name in flows: + flow = FLOWS[name] + for s in flow[1:-1]: # for each in progress state + if s == state: + return flow[0] # return the first state in the flow + return state # if no in progress state matches, return same state diff --git a/wubloader/wubloader.py b/wubloader/wubloader.py index a3884b7..a2c64d7 100644 --- a/wubloader/wubloader.py +++ b/wubloader/wubloader.py @@ -4,21 +4,45 @@ The central management class which everything else is run from. Its lifecycle is managed directly by main(). """ +from calendar import timegm +import logging +import time +import socket + +import gevent.event +import gevent.pool + +from .heartbeat import Heartbeat +from .job import Job +from .sheets import SheetsManager +from . import states + class Wubloader(object): + JOBS_POLL_INTERVAL = 0.5 + def __init__(self, config): self.config = config - self.id = config.get('name', socket.gethostname()) + self.bustime_base = timegm(time.strptime(config['bustime_start'], '%Y-%m-%dT%H:%M:%SZ')) + + self.name = config.get('name', socket.gethostname()) self.sheets = SheetsManager(config['sheets'], config['creds']) self.stopping = False self.stopped = gevent.event.Event() + # self.group contains all sub-greenlets and is used to ensure they're all shut down before exiting self.group = gevent.pool.Group() + # self.job is kept as a seperate reference here so it's cancellable self.job = None + # self.uploads is a group tracking all currently ongoing uploads. + # note it's a subset of self.group + self.uploads = gevent.pool.Group() + + self.heartbeat = Heartbeat(self.sheets['heartbeat'], self.name, self.group) - self.group.spawn(self._run) + gevent.spawn(self._run) def stop(self): """Tell wubloader to gracefully stop by finishing current jobs but starting no new ones.""" @@ -28,18 +52,58 @@ class Wubloader(object): """Tell wubloader to forcefully stop by cancelling current jobs.""" if self.job: self.job.cancel() + self.uploads.kill(block=False) def _run(self): # clean up in case of prior unclean shutdown self.cleanup_existing() - with Heartbeat(self.sheets['heartbeat'], self.name, self.group): + # heartbeat will periodically update a sheet to indicate we're alive, + # and tell us who else is alive. + with self.heartbeat: while not self.stopping: for job in self.find_jobs(): - # TODO if we're not doing it, handle this and continue - # TODO if we're doing it, create Job and set self.job - # TODO wait for it to finish - # TODO break, to check stopping and restart job list from beginning + + # If it's already claimed, ignore it. + # Note this check considers a claim by a dead bot to be invalid. + if job.uploader: + continue + + # If we're not allowed to claim it, ignore it. + if self.name.lower() in job.excluded: + continue + + # Acceptance checks + try: + # Checking duration exercises start time and end time parsing, + # which raise ValueError if they're bad. + if job.duration <= 0: + raise ValueError("Duration is {} sec, which is <= 0".format(job.duration)) + except ValueError as e: + # Note that as acceptance checks are fixable, we do not put job into an error state. + # Job will proceed as soon as it's fixed. + # We only inform user of errors if notes field is blank to avoid overwriting more important info. + if not job.row.notes: + job.row.update(notes="Acceptance check failed: {}".format(e)) + continue + # Acceptance tests passed, remove existing note on failed checks if present + if job.row.notes.startswith("Acceptance check failed: "): + job.row.update(notes="") + + # Do we have all the data? + # TODO if we don't, check if end time is recent. if so, skip for now. + # if not, initiate claim-with-holes process + + # We've claimed the job, process it. + self.job = job + self.job.process() + + # Exit the loop to check stopping and restart our scan for eligible jobs. + break + + else: + # We reached the end of the jobs list and didn't find any jobs to do + gevent.sleep(self.JOBS_POLL_INTERVAL) # wait for any remaining tasks to finish self.group.join() @@ -47,5 +111,19 @@ class Wubloader(object): self.stopped.set() def cleanup_existing(self): - """Scan for any existing jobs claimed by us, and cancel them.""" - # TODO + """Scan for any existing rows claimed by us, and cancel them.""" + for sheet in self.sheets['main'] + self.sheets['chunks']: + for row in sheet: + if row.uploader == self.name: + logging.warning("Found existing claimed job for us, clearing") + row.update(uploader="", state=states.rollback(row.state)) + + def find_jobs(self): + """Return potential jobs (based only on state), in priority order.""" + jobs = [] + for sheet_type in ('main', 'chunks'): + for sheet in self.sheets[sheet_type]: + for row in sheet: + if row.state in states.IS_ACTIONABLE: + jobs.append(Job(self, sheet_type == 'chunks', sheet, row)) + return sorted(jobs, key=lambda job: job.priority)