From 32805176188c3884c8e8b94de775a5d19131de50 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 10 Sep 2018 05:55:46 -0700 Subject: [PATCH] Some initial stuff --- wubloader/heartbeat.py | 35 +++++++++ wubloader/main.py | 40 +++++++++- wubloader/sheets.py | 166 +++++++++++++++++++++++++++++++++++++++++ wubloader/wubloader.py | 51 +++++++++++++ 4 files changed, 290 insertions(+), 2 deletions(-) create mode 100644 wubloader/heartbeat.py create mode 100644 wubloader/sheets.py create mode 100644 wubloader/wubloader.py diff --git a/wubloader/heartbeat.py b/wubloader/heartbeat.py new file mode 100644 index 0000000..8a58843 --- /dev/null +++ b/wubloader/heartbeat.py @@ -0,0 +1,35 @@ + + + +class Heartbeat(object): + """Periodically writes current time to the cell associated with this instance, + indicating it's still alive. + The given sheet should be rows of the form: + instance, last update time (in epoch) + This instance will be added if it doesn't already exist. + + This class is a context manager and will run until exit. + """ + HEARTBEAT_INTERVAL = 1 + + def __init__(self, sheet, name, group): + self.sheet = sheet + self.name = name + self.stopped = gevent.event.Event() + + def __enter__(self): + self.worker = group.spawn(self._run) + return self + + def __exit__(self, *exc_info): + self.stopped.set() + + def _run(self): + row = self.sheet[self.name] + if not row: + # it doesn't already exist, create it + row = self.sheet.append(id=self.name, heartbeat=time.time()) + while not self.stopped.wait(self.HEARTBEAT_INTERVAL): + row.update(heartbeat=time.time()) + # clear the heartbeat to indicate we're stopping + row.update(heartbeat="") diff --git a/wubloader/main.py b/wubloader/main.py index bfc28ff..2e12b16 100644 --- a/wubloader/main.py +++ b/wubloader/main.py @@ -1,14 +1,50 @@ import logging +import signal + +import gevent + +from .wubloader import Wubloader + # Verbose format but includes all extra info LOG_FORMAT = "[%(asctime)s] %(levelname)s pid:%(process)d tid:%(thread)d %(name)s (%(pathname)s:%(funcName)s:%(lineno)d): %(message)s" -def main(conf_file, log_level='INFO'): +def main(conf_file, log_level='INFO', backdoor=None): logging.basicConfig(level=log_level, format=LOG_FORMAT) with open(conf_file) as f: config = json.load(f) - + wubloader = Wubloader(config) + + # debugging backdoor listening on given localhost port + if backdoor: + gevent.backdoor.BackdoorServer(('localhost', int(backdoor)), locals=locals()).start() + + # Set up a queue to receive and handle incoming graceful shutdown signals + signal_queue = gevent.queue.Queue() + def on_signal(): + signal_queue.put(None) + # Gracefully shut down on INT or TERM + for sig in (signal.SIGINT, signal.SIGTERM): + gevent.signal(sig, on_signal) + # Since forcefully raising a KeyboardInterrupt to see where you're stuck is useful for debugging, + # remap that to SIGQUIT. + # Note that signal.signal() will run _immediately_ whereas gevent.signal() waits until current + # greenlet is blocking. + def raise_interrupt(frame, signum): + raise KeyboardInterrupt + signal.signal(signal.SIGQUIT, raise_interrupt) + + signal_queue.get() # block until shutdown + logging.info("Interrupt received. Finishing existing jobs and exiting. Interrupt again to exit immediately.") + wubloader.stop() + + # block until shutdown complete OR second shutdown signal + waiter = gevent.spawn(signal_queue.get) + gevent.wait([wubloader.stopped, waiter], count=1) + if not wubloader.stopped.ready(): + logging.warning("Second interrupt recieved. Doing basic cleanup and exiting immediately.") + wubloader.cancel_all() diff --git a/wubloader/sheets.py b/wubloader/sheets.py new file mode 100644 index 0000000..9ae1ec5 --- /dev/null +++ b/wubloader/sheets.py @@ -0,0 +1,166 @@ + + +"""The classes in the file wrap the gspread API to present a simpler interface, +which transparently handles re-connecting, sheets schemas and tracking rows by id. +""" + +# schemas maps sheet name to schema. +# each schema contains a map from column names to column indexes (1-based) +SCHEMAS = { + "heartbeat": { + "id": 1, + "heartbeat": 2, + }, + "chunks": { + # TODO + }, + "main": { + # TODO + }, +} + + +class SheetsManager(object): + """ + Allows each kind of named sheet to be accessed via item lookup, eg. sheets["heartbeat"]. + Sheet names given under IS_SINGLE return single sheets, others return a list of sheets. + """ + IS_SINGLE = ["heartbeat"] + REFRESH_THRESHOLD = 120 + + _client = None + + def __init__(self, sheet_configs, creds): + """ + sheet_configs should be a map from each sheet name to a tuple (if name in IN_SINGLE), + or list of tuples (sheet_id, worksheet_title), indicating the sheet id and + worksheet name of each worksheet to be associated with that name. + + creds should be a map containing the keys private_key and client_email, + as required by google's auth. + """ + self._creds = SignedJwtAssertionCredentials( + service_account_name=creds['client_email'], + private_key=creds['private_key'], + scope=['https://spreadsheets.google.com/feeds'], + ) + + # gspread library may not be threadsafe, so for safety we enclose all accesses with this lock + self.lock = gevent.lock.RLock() + + # all client usage should be wrapped in manager.lock and begin with manager.refresh() + self.client = gspread.authorize(self._creds) + + self.sheets = {} + for name, config in sheet_configs.items(): + if name in self.IS_SINGLE: + self.sheets[name] = Sheet(self, SCHEMAS[name], *config) + else: + self.sheets[name] = [Sheet(self, SCHEMAS[name], *c) for c in config] + + def refresh(self): + """Checks if client auth needs refreshing, and does so if needed.""" + if self._creds.get_access_token().expires_in < self.REFRESH_THRESHOLD: + self._client.login() # refresh creds + + def __getitem__(self, item): + return self.sheets[item] + + +class Sheet(object): + """Represents a single worksheet. Rows can be looked up by id by getitem: row = sheet[id]. + This will return None if the id cannot be found. + Rows can be created by append(). + You can search through all rows by iterating over this sheet. + """ + def __init__(self, manager, schema, sheet_id, worksheet_title): + self.manager = manager + self.schema = schema + self.sheet_id = sheet_id + self.worksheet_title = worksheet_title + self.worksheet = self.manager.client.open_by_key(sheet_id).worksheet(worksheet_title) + + def __repr__(self): + return "<{cls.__name__} {self.sheet_id!r}/{self.worksheet_title!r} at {id:x}>".format(cls=type(self), self=self, id=id(self)) + __str__ = __repr__ + + def __getitem__(self, item): + return self.find_row(item) + + def __iter__(self): + with self.manager.lock: + self.manager.refresh() + return [Row(self, schema, i+1, r) for i, r in enumerate(self.worksheet.get_all_values())] + + def find_row(self, id): + for row in self: + if row.id == id: + return row + return None + + def by_index(self, index): + with self.manager.lock: + self.manager.refresh() + return Row(self, schema, index, self.worksheet.row_values(index)) + + def append(self, id, **values): + """Create new row with given initial values, and return it.""" + values['id'] = id + rendered = ["" for _ in range(max(self.schema.values()))] + for name, value in values.items(): + rendered[self.schema[name]-1] = value + with self.manager.lock: + self.manager.refresh() + self.worksheet.append_row(rendered) + row = self[id] + if not row: + raise Exception("Unrecoverable race condition: Created new row with id {} but then couldn't find it".format(id)) + return row + + +class Row(object): + """Represents a row in a sheet. Values can be looked up by attribute. + Values can be updated with update(attr=value), which returns the updated row. + Note that a row must have an id to be updatable. Updating is permitted if no id is set + only if id is one of the values being written. + You can also refresh the row (if it has an id) by calling row.refresh(), + which returns the newly read row, or None if it can no longer be found. + """ + + def __init__(self, sheet, schema, index, values): + self.sheet = sheet + self.manager = sheet.manager + self.schema = schema + self.index = index + self.values = values + + def __repr__(self): + return "<{cls.__name__} {self.id}({self.index}) of {self.sheet} at {id:x}>".format(cls=type(self), self=self, id=id(self)) + __str__ = __repr__ + + def __getattr__(self, attr): + col = self.schema[attr] + if len(self.values) > col: + return self.values[col] + return "" + + def update(self, **values): + with self.manager.lock: + self.manager.refresh() + # We attempt to detect races by: + # Always refreshing our position before we begin (if we can) + # Checking our position again afterwards. If it's changed, we probably mis-wrote. + if self.id: + before = self.refresh() + else: + before = self + for name, value in values.items(): + col = self.schema[name] + self.sheet.worksheet.update_cell(before.index, col, value) + after = self.sheet.by_index(before.index) + new_id = values['id'] if 'id' in values else self.id + if after.id != new_id: + raise Exception("Likely bad write: Row {} may have had row {} data partially written: {}".format(after, before, values)) + + def refresh(self): + return self.sheet[self.id] diff --git a/wubloader/wubloader.py b/wubloader/wubloader.py new file mode 100644 index 0000000..a3884b7 --- /dev/null +++ b/wubloader/wubloader.py @@ -0,0 +1,51 @@ + +""" +The central management class which everything else is run from. +Its lifecycle is managed directly by main(). +""" + + +class Wubloader(object): + def __init__(self, config): + self.config = config + + self.id = config.get('name', socket.gethostname()) + self.sheets = SheetsManager(config['sheets'], config['creds']) + + self.stopping = False + self.stopped = gevent.event.Event() + + self.group = gevent.pool.Group() + self.job = None + + self.group.spawn(self._run) + + def stop(self): + """Tell wubloader to gracefully stop by finishing current jobs but starting no new ones.""" + self.stopping = True + + def cancel_all(self): + """Tell wubloader to forcefully stop by cancelling current jobs.""" + if self.job: + self.job.cancel() + + def _run(self): + # clean up in case of prior unclean shutdown + self.cleanup_existing() + + with Heartbeat(self.sheets['heartbeat'], self.name, self.group): + while not self.stopping: + for job in self.find_jobs(): + # TODO if we're not doing it, handle this and continue + # TODO if we're doing it, create Job and set self.job + # TODO wait for it to finish + # TODO break, to check stopping and restart job list from beginning + + # wait for any remaining tasks to finish + self.group.join() + # indicate that we're done + self.stopped.set() + + def cleanup_existing(self): + """Scan for any existing jobs claimed by us, and cancel them.""" + # TODO