You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
wubloader/wubloader/wubloader.py

131 lines
4.2 KiB
Python

"""
The central management class which everything else is run from.
Its lifecycle is managed directly by main().
"""
6 years ago
from calendar import timegm
import logging
import time
import socket
import gevent.event
import gevent.pool
from .heartbeat import Heartbeat
from .job import Job
from .sheets import SheetsManager
from . import states
class Wubloader(object):
6 years ago
JOBS_POLL_INTERVAL = 0.5
def __init__(self, config):
self.config = config
6 years ago
self.bustime_base = timegm(time.strptime(config['bustime_start'], '%Y-%m-%dT%H:%M:%SZ'))
self.name = config.get('name', socket.gethostname())
self.sheets = SheetsManager(config['sheets'], config['creds'])
self.stopping = False
self.stopped = gevent.event.Event()
6 years ago
# self.group contains all sub-greenlets and is used to ensure they're all shut down before exiting
self.group = gevent.pool.Group()
6 years ago
# self.job is kept as a seperate reference here so it's cancellable
self.job = None
6 years ago
# self.uploads is a group tracking all currently ongoing uploads.
# note it's a subset of self.group
self.uploads = gevent.pool.Group()
self.heartbeat = Heartbeat(self.sheets['heartbeat'], self.name, self.group)
6 years ago
gevent.spawn(self._run)
def stop(self):
"""Tell wubloader to gracefully stop by finishing current jobs but starting no new ones."""
self.stopping = True
def cancel_all(self):
"""Tell wubloader to forcefully stop by cancelling current jobs."""
if self.job:
self.job.cancel()
6 years ago
self.uploads.kill(block=False)
def _run(self):
# clean up in case of prior unclean shutdown
self.cleanup_existing()
6 years ago
# heartbeat will periodically update a sheet to indicate we're alive,
# and tell us who else is alive.
with self.heartbeat:
while not self.stopping:
for job in self.find_jobs():
6 years ago
# If it's already claimed (except by us), ignore it.
# Note this check considers a claim by a dead bot to be invalid (except for publishes).
if job.uploader and job.uploader != self.name:
6 years ago
continue
# If we're not allowed to claim it, ignore it.
if self.name.lower() in job.excluded:
continue
# Acceptance checks
try:
# Checking duration exercises start time and end time parsing,
# which raise ValueError if they're bad.
if job.duration <= 0:
raise ValueError("Duration is {} sec, which is <= 0".format(job.duration))
except ValueError as e:
# Note that as acceptance checks are fixable, we do not put job into an error state.
# Job will proceed as soon as it's fixed.
# We only inform user of errors if notes field is blank to avoid overwriting more important info.
if not job.row.notes:
job.row.update(notes="Acceptance check failed: {}".format(e))
continue
# Acceptance tests passed, remove existing note on failed checks if present
if job.row.notes.startswith("Acceptance check failed: "):
job.row.update(notes="")
# Do we have all the data?
# TODO if we don't, check if end time is recent. if so, skip for now.
# if not, initiate claim-with-holes process
# We've claimed the job, process it.
self.job = job
self.job.process()
# Exit the loop to check stopping and restart our scan for eligible jobs.
break
else:
# We reached the end of the jobs list and didn't find any jobs to do
gevent.sleep(self.JOBS_POLL_INTERVAL)
# wait for any remaining tasks to finish
self.group.join()
# indicate that we're done
self.stopped.set()
def cleanup_existing(self):
"""Scan for any existing non-publish rows claimed by us, and cancel them."""
for job in self.find_jobs():
if job.row.uploader == self.name and job.row.state != states.rollback(job.row.state):
logging.warning("Found existing in progress job for us, clearing")
job.row.update(state=states.rollback(job.row.state))
if job.job_type != 'publish':
job.row.update(uploader="")
6 years ago
def find_jobs(self):
"""Return potential jobs (based only on state), in priority order."""
jobs = []
for sheet_type in ('main', 'chunks'):
for sheet in self.sheets[sheet_type]:
for row in sheet:
if row.state in states.IS_ACTIONABLE:
jobs.append(Job(self, sheet_type == 'chunks', sheet, row))
return sorted(jobs, key=lambda job: job.priority)