archive/original-v2.1
Mike Lang 6 years ago
parent 3280517618
commit dcb6d6ba6d

@ -1,4 +1,8 @@
import logging
import time
import gevent
class Heartbeat(object): class Heartbeat(object):
@ -8,9 +12,14 @@ class Heartbeat(object):
instance, last update time (in epoch) instance, last update time (in epoch)
This instance will be added if it doesn't already exist. This instance will be added if it doesn't already exist.
Also keeps track of what other bots are alive in self.alive
This class is a context manager and will run until exit. This class is a context manager and will run until exit.
""" """
# How often to refresh our heartbeat
HEARTBEAT_INTERVAL = 1 HEARTBEAT_INTERVAL = 1
# How old other bots heartbeat needs to be to consider them dead
HEARTBEAT_THRESHOLD = 10
def __init__(self, sheet, name, group): def __init__(self, sheet, name, group):
self.sheet = sheet self.sheet = sheet
@ -18,7 +27,8 @@ class Heartbeat(object):
self.stopped = gevent.event.Event() self.stopped = gevent.event.Event()
def __enter__(self): def __enter__(self):
self.worker = group.spawn(self._run) self.alive = self._get_alive() # do one now to prevent a race where it's read before it's written
self.worker = self.group.spawn(self._run)
return self return self
def __exit__(self, *exc_info): def __exit__(self, *exc_info):
@ -31,5 +41,24 @@ class Heartbeat(object):
row = self.sheet.append(id=self.name, heartbeat=time.time()) row = self.sheet.append(id=self.name, heartbeat=time.time())
while not self.stopped.wait(self.HEARTBEAT_INTERVAL): while not self.stopped.wait(self.HEARTBEAT_INTERVAL):
row.update(heartbeat=time.time()) row.update(heartbeat=time.time())
self.alive = self._get_alive()
# clear the heartbeat to indicate we're stopping # clear the heartbeat to indicate we're stopping
row.update(heartbeat="") row.update(heartbeat="")
def _get_alive(self):
alive = set()
for row in self.sheet:
if not row.id:
continue
try:
heartbeat = float(row.heartbeat)
except ValueError:
logging.warning("Invalid heartbeat value for row {}: {!r}".format(row, row.heartbeat))
continue
age = time.time() - heartbeat
if age > self.HEARTBEAT_THRESHOLD:
logging.debug("Considering {} dead: heartbeat too old at {} sec".format(row.id, age))
continue
alive.add(row.id)
return alive

@ -0,0 +1,109 @@
import logging
from . import states
class Job(object):
"""A job wraps a row and represents a video cutting job to do."""
# How often to check if someone else has claimed a row out from underneath us.
OWNERSHIP_CHECK_INTERVAL = 1
def __init__(self, wubloader, is_chunk, sheet, row):
self.wubloader = wubloader
if is_chunk:
self.job_type = 'chunk'
elif row.state in states.FLOWS['draft']:
self.job_type = 'draft'
else:
assert row.state in states.FLOWS['publish']
self.job_type = 'publish'
self.sheet = sheet
self.row = row
@property
def priority(self):
"""User-set priority is most important, followed by type, then earliest first."""
type_priority = ['chunk', 'draft', 'publish'] # low to high priority
return (
getattr(self.row, 'priority', 0), # chunks don't have priority, default to 0
type_priority.index(self.job_type),
-self.sheet.id, # sheet index, low number is high priority
-self.row.index, # row index, low number is high priority
)
@property
def uploader(self):
"""A processed uploader check that ignores dead bots"""
return self.row.uploader if self.row.uploader in self.wubloader.heartbeat.alive else ""
@property
def excluded(self):
"""Bots that may not claim this row. NOTE: All lowercase."""
if not self.row.excluded.strip():
return []
return [name.strip().lower() for name in self.row.excluded.split(',')]
@property
def start_time(self):
try:
return parse_bustime(self.wubloader.bustime_base, self.row.start_time)
except ValueError as e:
raise ValueError("Start time: {}".format(e))
@property
def end_time(self):
try:
return parse_bustime(self.wubloader.bustime_base, self.row.end_time)
except ValueError as e:
raise ValueError("End time: {}".format(e))
@property
def duration(self):
return self.end_time - self.start_time
def cancel(self):
"""Cancel job that is currently being processed, setting it back to its starting state."""
if not self.worker.ready():
# By setting uploader to blank, the watchdog will stop the in-progress job.
self.row.update(state=states.FLOWS[self.job_type][0], uploader="")
def process(self):
"""Call this to perform the job."""
# We do the actual work in a seperate greenlet so we can easily cancel it.
self.worker = self.wubloader.group.spawn(self._process)
# While that does the real work, we poll the uploader field to check no-one else has stolen it.
while not self.worker.ready():
row = self.row.refresh()
if row is None or row.uploader != self.row.uploader:
# Our row's been stolen, cancelled, or just plain lost.
# Abort with no rollback - let them have it.
logging.warning("Job {} aborted: Row {} is {}".format(self, self.row,
"gone" if row is None
else "cancelled" if row.uploader == ""
else "claimed by {}".format(row.uploader)
))
self.worker.kill(block=True)
return
# Sleep until either worker is done or interval has passed
self.worker.join(self.OWNERSHIP_CHECK_INTERVAL)
def _process(self):
"""Does the actual cutting work. You should call process() instead."""
# TODO
def parse_bustime(base, value):
parts = value.strip().split(':')
if len(parts) == 2:
hours = int(parts[0])
mins = float(parts[1])
secs = 0
elif len(parts) == 3:
hours = int(parts[0])
mins = int(parts[1])
secs = float(parts[2])
else:
raise ValueError("Bad format: Must be HH:MM or HH:MM:SS")
return base + hours * 3600 + mins * 60 + secs

@ -1,4 +1,5 @@
import json
import logging import logging
import signal import signal

@ -1,9 +1,14 @@
"""The classes in the file wrap the gspread API to present a simpler interface, """The classes in the file wrap the gspread API to present a simpler interface,
which transparently handles re-connecting, sheets schemas and tracking rows by id. which transparently handles re-connecting, sheets schemas and tracking rows by id.
""" """
import gevent.lock
from oauth2client.client import SignedJwtAssertionCredentials
import gspread
# schemas maps sheet name to schema. # schemas maps sheet name to schema.
# each schema contains a map from column names to column indexes (1-based) # each schema contains a map from column names to column indexes (1-based)
SCHEMAS = { SCHEMAS = {
@ -12,10 +17,32 @@ SCHEMAS = {
"heartbeat": 2, "heartbeat": 2,
}, },
"chunks": { "chunks": {
# TODO "start": 1,
"end": 2,
"description": 4,
"link": 5,
"state": 6,
"uploader": 7,
"notes": 8,
"id": 9,
"cut_time": 10,
"upload_time": 11,
"duration": 12,
}, },
"main": { "main": {
# TODO "start": 1,
"end": 2,
"description": 4,
"link": 7,
"state": 8,
"location": 9,
"uploader": 10,
"notes": 12,
"id": 14,
"draft_time": 15,
"cut_time": 16,
"upload_time": 17,
"duration": 18,
}, },
} }
@ -90,7 +117,7 @@ class Sheet(object):
def __iter__(self): def __iter__(self):
with self.manager.lock: with self.manager.lock:
self.manager.refresh() self.manager.refresh()
return [Row(self, schema, i+1, r) for i, r in enumerate(self.worksheet.get_all_values())] return [Row(self, self.schema, i+1, r) for i, r in enumerate(self.worksheet.get_all_values())]
def find_row(self, id): def find_row(self, id):
for row in self: for row in self:
@ -101,7 +128,7 @@ class Sheet(object):
def by_index(self, index): def by_index(self, index):
with self.manager.lock: with self.manager.lock:
self.manager.refresh() self.manager.refresh()
return Row(self, schema, index, self.worksheet.row_values(index)) return Row(self, self.schema, index, self.worksheet.row_values(index))
def append(self, id, **values): def append(self, id, **values):
"""Create new row with given initial values, and return it.""" """Create new row with given initial values, and return it."""
@ -152,6 +179,7 @@ class Row(object):
# Checking our position again afterwards. If it's changed, we probably mis-wrote. # Checking our position again afterwards. If it's changed, we probably mis-wrote.
if self.id: if self.id:
before = self.refresh() before = self.refresh()
# TODO handle before = None
else: else:
before = self before = self
for name, value in values.items(): for name, value in values.items():

@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
QUEUED = "[✓] Queued"
PROCESSING_VIDEO = "[^] Processing Video"
AWAITING_EDITS = "[✓] Awaiting Edits"
EDITS_QUEUED = "[✓] Edits Queued"
PROCESSING_EDITS = "[^] Processing Edits"
UPLOADING = "[^] Uploading"
PUBLISHED = "[✓] Published"
ERROR = "[❌] Error"
# Map {name: (ready, *in progress, complete)} state flows.
# Note that due to bot deaths, etc, it can be pretty easy for something to be in an in-progress state
# but not actually be in progress. We treat in progress and ready states as basically equivalent and only
# existing for human feedback. Our actual in progress indicator comes from the uploader field,
# which can be ignored if the uploader in question is dead.
FLOWS = {
'draft': (QUEUED, PROCESSING_VIDEO, AWAITING_EDITS),
'publish': (EDITS_QUEUED, PROCESSING_EDITS, UPLOADING, PUBLISHED),
'chunk': (QUEUED, PROCESSING_VIDEO, UPLOADING, PUBLISHED),
}
CHUNK_FLOWS = ('chunk',)
MAIN_FLOWS = ('draft', 'publish')
# Whether this is a state we want to act on, defined as any non-complete state.
def is_actionable(sheet_type, state):
flows = CHUNK_FLOWS if sheet_type == 'chunks' else MAIN_FLOWS
for name in flows:
flow = FLOWS[name]
if state in flow[:-1]:
return True
return False
# General map from in-progress states to the state to rollback to.
# For non-in-progress states, we just map them to themselves.
def rollback(sheet_type, state):
flows = CHUNK_FLOWS if sheet_type == 'chunks' else MAIN_FLOWS
for name in flows:
flow = FLOWS[name]
for s in flow[1:-1]: # for each in progress state
if s == state:
return flow[0] # return the first state in the flow
return state # if no in progress state matches, return same state

@ -4,21 +4,45 @@ The central management class which everything else is run from.
Its lifecycle is managed directly by main(). Its lifecycle is managed directly by main().
""" """
from calendar import timegm
import logging
import time
import socket
import gevent.event
import gevent.pool
from .heartbeat import Heartbeat
from .job import Job
from .sheets import SheetsManager
from . import states
class Wubloader(object): class Wubloader(object):
JOBS_POLL_INTERVAL = 0.5
def __init__(self, config): def __init__(self, config):
self.config = config self.config = config
self.id = config.get('name', socket.gethostname()) self.bustime_base = timegm(time.strptime(config['bustime_start'], '%Y-%m-%dT%H:%M:%SZ'))
self.name = config.get('name', socket.gethostname())
self.sheets = SheetsManager(config['sheets'], config['creds']) self.sheets = SheetsManager(config['sheets'], config['creds'])
self.stopping = False self.stopping = False
self.stopped = gevent.event.Event() self.stopped = gevent.event.Event()
# self.group contains all sub-greenlets and is used to ensure they're all shut down before exiting
self.group = gevent.pool.Group() self.group = gevent.pool.Group()
# self.job is kept as a seperate reference here so it's cancellable
self.job = None self.job = None
# self.uploads is a group tracking all currently ongoing uploads.
# note it's a subset of self.group
self.uploads = gevent.pool.Group()
self.heartbeat = Heartbeat(self.sheets['heartbeat'], self.name, self.group)
self.group.spawn(self._run) gevent.spawn(self._run)
def stop(self): def stop(self):
"""Tell wubloader to gracefully stop by finishing current jobs but starting no new ones.""" """Tell wubloader to gracefully stop by finishing current jobs but starting no new ones."""
@ -28,18 +52,58 @@ class Wubloader(object):
"""Tell wubloader to forcefully stop by cancelling current jobs.""" """Tell wubloader to forcefully stop by cancelling current jobs."""
if self.job: if self.job:
self.job.cancel() self.job.cancel()
self.uploads.kill(block=False)
def _run(self): def _run(self):
# clean up in case of prior unclean shutdown # clean up in case of prior unclean shutdown
self.cleanup_existing() self.cleanup_existing()
with Heartbeat(self.sheets['heartbeat'], self.name, self.group): # heartbeat will periodically update a sheet to indicate we're alive,
# and tell us who else is alive.
with self.heartbeat:
while not self.stopping: while not self.stopping:
for job in self.find_jobs(): for job in self.find_jobs():
# TODO if we're not doing it, handle this and continue
# TODO if we're doing it, create Job and set self.job # If it's already claimed, ignore it.
# TODO wait for it to finish # Note this check considers a claim by a dead bot to be invalid.
# TODO break, to check stopping and restart job list from beginning if job.uploader:
continue
# If we're not allowed to claim it, ignore it.
if self.name.lower() in job.excluded:
continue
# Acceptance checks
try:
# Checking duration exercises start time and end time parsing,
# which raise ValueError if they're bad.
if job.duration <= 0:
raise ValueError("Duration is {} sec, which is <= 0".format(job.duration))
except ValueError as e:
# Note that as acceptance checks are fixable, we do not put job into an error state.
# Job will proceed as soon as it's fixed.
# We only inform user of errors if notes field is blank to avoid overwriting more important info.
if not job.row.notes:
job.row.update(notes="Acceptance check failed: {}".format(e))
continue
# Acceptance tests passed, remove existing note on failed checks if present
if job.row.notes.startswith("Acceptance check failed: "):
job.row.update(notes="")
# Do we have all the data?
# TODO if we don't, check if end time is recent. if so, skip for now.
# if not, initiate claim-with-holes process
# We've claimed the job, process it.
self.job = job
self.job.process()
# Exit the loop to check stopping and restart our scan for eligible jobs.
break
else:
# We reached the end of the jobs list and didn't find any jobs to do
gevent.sleep(self.JOBS_POLL_INTERVAL)
# wait for any remaining tasks to finish # wait for any remaining tasks to finish
self.group.join() self.group.join()
@ -47,5 +111,19 @@ class Wubloader(object):
self.stopped.set() self.stopped.set()
def cleanup_existing(self): def cleanup_existing(self):
"""Scan for any existing jobs claimed by us, and cancel them.""" """Scan for any existing rows claimed by us, and cancel them."""
# TODO for sheet in self.sheets['main'] + self.sheets['chunks']:
for row in sheet:
if row.uploader == self.name:
logging.warning("Found existing claimed job for us, clearing")
row.update(uploader="", state=states.rollback(row.state))
def find_jobs(self):
"""Return potential jobs (based only on state), in priority order."""
jobs = []
for sheet_type in ('main', 'chunks'):
for sheet in self.sheets[sheet_type]:
for row in sheet:
if row.state in states.IS_ACTIONABLE:
jobs.append(Job(self, sheet_type == 'chunks', sheet, row))
return sorted(jobs, key=lambda job: job.priority)

Loading…
Cancel
Save