mirror of https://github.com/ekimekim/wubloader
Some initial stuff
@ -0,0 +1,35 @@
class Heartbeat(object):
"""Periodically writes current time to the cell associated with this instance,
indicating it's still alive.
The given sheet should be rows of the form:
instance, last update time (in epoch)
This instance will be added if it doesn't already exist.
This class is a context manager and will run until exit.
def __init__(self, sheet, name, group):
self.sheet = sheet
self.name = name
self.stopped = gevent.event.Event()
def __enter__(self):
self.worker = group.spawn(self._run)
return self
def __exit__(self, *exc_info):
def _run(self):
row = self.sheet[self.name]
if not row:
# it doesn't already exist, create it
row = self.sheet.append(id=self.name, heartbeat=time.time())
while not self.stopped.wait(self.HEARTBEAT_INTERVAL):
# clear the heartbeat to indicate we're stopping
@ -1,14 +1,50 @@
import logging
import signal
import gevent
from .wubloader import Wubloader
# Verbose format but includes all extra info
LOG_FORMAT = "[%(asctime)s] %(levelname)s pid:%(process)d tid:%(thread)d %(name)s (%(pathname)s:%(funcName)s:%(lineno)d): %(message)s"
def main(conf_file, log_level='INFO'):
def main(conf_file, log_level='INFO', backdoor=None):
logging.basicConfig(level=log_level, format=LOG_FORMAT)
with open(conf_file) as f:
config = json.load(f)
wubloader = Wubloader(config)
# debugging backdoor listening on given localhost port
if backdoor:
gevent.backdoor.BackdoorServer(('localhost', int(backdoor)), locals=locals()).start()
# Set up a queue to receive and handle incoming graceful shutdown signals
signal_queue = gevent.queue.Queue()
def on_signal():
# Gracefully shut down on INT or TERM
for sig in (signal.SIGINT, signal.SIGTERM):
gevent.signal(sig, on_signal)
# Since forcefully raising a KeyboardInterrupt to see where you're stuck is useful for debugging,
# remap that to SIGQUIT.
# Note that signal.signal() will run _immediately_ whereas gevent.signal() waits until current
# greenlet is blocking.
def raise_interrupt(frame, signum):
raise KeyboardInterrupt
signal.signal(signal.SIGQUIT, raise_interrupt)
signal_queue.get() # block until shutdown
logging.info("Interrupt received. Finishing existing jobs and exiting. Interrupt again to exit immediately.")
# block until shutdown complete OR second shutdown signal
waiter = gevent.spawn(signal_queue.get)
gevent.wait([wubloader.stopped, waiter], count=1)
if not wubloader.stopped.ready():
logging.warning("Second interrupt recieved. Doing basic cleanup and exiting immediately.")
@ -0,0 +1,166 @@
"""The classes in the file wrap the gspread API to present a simpler interface,
which transparently handles re-connecting, sheets schemas and tracking rows by id.
# schemas maps sheet name to schema.
# each schema contains a map from column names to column indexes (1-based)
"heartbeat": {
"id": 1,
"heartbeat": 2,
"chunks": {
"main": {
class SheetsManager(object):
Allows each kind of named sheet to be accessed via item lookup, eg. sheets["heartbeat"].
Sheet names given under IS_SINGLE return single sheets, others return a list of sheets.
IS_SINGLE = ["heartbeat"]
_client = None
def __init__(self, sheet_configs, creds):
sheet_configs should be a map from each sheet name to a tuple (if name in IN_SINGLE),
or list of tuples (sheet_id, worksheet_title), indicating the sheet id and
worksheet name of each worksheet to be associated with that name.
creds should be a map containing the keys private_key and client_email,
as required by google's auth.
self._creds = SignedJwtAssertionCredentials(
# gspread library may not be threadsafe, so for safety we enclose all accesses with this lock
self.lock = gevent.lock.RLock()
# all client usage should be wrapped in manager.lock and begin with manager.refresh()
self.client = gspread.authorize(self._creds)
self.sheets = {}
for name, config in sheet_configs.items():
if name in self.IS_SINGLE:
self.sheets[name] = Sheet(self, SCHEMAS[name], *config)
self.sheets[name] = [Sheet(self, SCHEMAS[name], *c) for c in config]
def refresh(self):
"""Checks if client auth needs refreshing, and does so if needed."""
if self._creds.get_access_token().expires_in < self.REFRESH_THRESHOLD:
self._client.login() # refresh creds
def __getitem__(self, item):
return self.sheets[item]
class Sheet(object):
"""Represents a single worksheet. Rows can be looked up by id by getitem: row = sheet[id].
This will return None if the id cannot be found.
Rows can be created by append().
You can search through all rows by iterating over this sheet.
def __init__(self, manager, schema, sheet_id, worksheet_title):
self.manager = manager
self.schema = schema
self.sheet_id = sheet_id
self.worksheet_title = worksheet_title
self.worksheet = self.manager.client.open_by_key(sheet_id).worksheet(worksheet_title)
def __repr__(self):
return "<{cls.__name__} {self.sheet_id!r}/{self.worksheet_title!r} at {id:x}>".format(cls=type(self), self=self, id=id(self))
__str__ = __repr__
def __getitem__(self, item):
return self.find_row(item)
def __iter__(self):
with self.manager.lock:
return [Row(self, schema, i+1, r) for i, r in enumerate(self.worksheet.get_all_values())]
def find_row(self, id):
for row in self:
if row.id == id:
return row
return None
def by_index(self, index):
with self.manager.lock:
return Row(self, schema, index, self.worksheet.row_values(index))
def append(self, id, **values):
"""Create new row with given initial values, and return it."""
values['id'] = id
rendered = ["" for _ in range(max(self.schema.values()))]
for name, value in values.items():
rendered[self.schema[name]-1] = value
with self.manager.lock:
row = self[id]
if not row:
raise Exception("Unrecoverable race condition: Created new row with id {} but then couldn't find it".format(id))
return row
class Row(object):
"""Represents a row in a sheet. Values can be looked up by attribute.
Values can be updated with update(attr=value), which returns the updated row.
Note that a row must have an id to be updatable. Updating is permitted if no id is set
only if id is one of the values being written.
You can also refresh the row (if it has an id) by calling row.refresh(),
which returns the newly read row, or None if it can no longer be found.
def __init__(self, sheet, schema, index, values):
self.sheet = sheet
self.manager = sheet.manager
self.schema = schema
self.index = index
self.values = values
def __repr__(self):
return "<{cls.__name__} {self.id}({self.index}) of {self.sheet} at {id:x}>".format(cls=type(self), self=self, id=id(self))
__str__ = __repr__
def __getattr__(self, attr):
col = self.schema[attr]
if len(self.values) > col:
return self.values[col]
return ""
def update(self, **values):
with self.manager.lock:
# We attempt to detect races by:
# Always refreshing our position before we begin (if we can)
# Checking our position again afterwards. If it's changed, we probably mis-wrote.
if self.id:
before = self.refresh()
before = self
for name, value in values.items():
col = self.schema[name]
self.sheet.worksheet.update_cell(before.index, col, value)
after = self.sheet.by_index(before.index)
new_id = values['id'] if 'id' in values else self.id
if after.id != new_id:
raise Exception("Likely bad write: Row {} may have had row {} data partially written: {}".format(after, before, values))
def refresh(self):
return self.sheet[self.id]
@ -0,0 +1,51 @@
The central management class which everything else is run from.
Its lifecycle is managed directly by main().
class Wubloader(object):
def __init__(self, config):
self.config = config
self.id = config.get('name', socket.gethostname())
self.sheets = SheetsManager(config['sheets'], config['creds'])
self.stopping = False
self.stopped = gevent.event.Event()
self.group = gevent.pool.Group()
self.job = None
def stop(self):
"""Tell wubloader to gracefully stop by finishing current jobs but starting no new ones."""
self.stopping = True
def cancel_all(self):
"""Tell wubloader to forcefully stop by cancelling current jobs."""
if self.job:
def _run(self):
# clean up in case of prior unclean shutdown
with Heartbeat(self.sheets['heartbeat'], self.name, self.group):
while not self.stopping:
for job in self.find_jobs():
# TODO if we're not doing it, handle this and continue
# TODO if we're doing it, create Job and set self.job
# TODO wait for it to finish
# TODO break, to check stopping and restart job list from beginning
# wait for any remaining tasks to finish
# indicate that we're done
def cleanup_existing(self):
"""Scan for any existing jobs claimed by us, and cancel them."""
Reference in New Issue