diff --git a/sheetsync/sheetsync/main.py b/sheetsync/sheetsync/main.py index da66c2d..6dc8b2c 100644 --- a/sheetsync/sheetsync/main.py +++ b/sheetsync/sheetsync/main.py @@ -53,6 +53,15 @@ event_counts = prom.Gauge( ['sheet_name', 'category', 'poster_moment', 'state', 'errored'], ) + +def wait(event, base, interval): + """Wait until INTERVAL seconds after BASE, or until event is set.""" + now = monotonic() + to_wait = base + common.jitter(interval) - now + if to_wait > 0: + event.wait(to_wait) + + class SheetSync(object): # Time between syncs @@ -75,7 +84,7 @@ class SheetSync(object): # If playlist_worksheet is defined, add 1 to len(worksheets). # For current values, this is 100/5 * 2 + 100/5/4 * 7 = 75 - def __init__(self, stop, dbmanager, sheets, sheet_id, worksheets, edit_url, bustime_start, allocate_ids=False, playlist_worksheet=None): + def __init__(self, stop, dbmanager, sheets, sheet_id, worksheets, edit_url, bustime_start, allocate_ids=False): self.stop = stop self.dbmanager = dbmanager self.sheets = sheets @@ -85,8 +94,6 @@ class SheetSync(object): self.edit_url = edit_url self.bustime_start = bustime_start self.allocate_ids = allocate_ids - # The playlist worksheet is checked on the same cadence as inactive sheets - self.playlist_worksheet = playlist_worksheet # Maps DB column names (or general identifier, for non-DB columns) to sheet column indexes. # Hard-coded for now, future work: determine this from column headers in sheet self.column_map = { @@ -149,13 +156,6 @@ class SheetSync(object): bustime = common.parse_bustime(value) return common.bustime_to_dt(self.bustime_start, bustime) - def wait(self, base, interval): - """Wait until INTERVAL seconds after BASE.""" - now = monotonic() - to_wait = base + common.jitter(interval) - now - if to_wait > 0: - self.stop.wait(to_wait) - def run(self): self.conn = self.dbmanager.get_conn() @@ -173,13 +173,11 @@ class SheetSync(object): if sync_count % self.SYNCS_PER_INACTIVE_CHECK == 0: # check all worksheets worksheets = self.worksheets - playlist_worksheet = self.playlist_worksheet else: # only check most recently changed worksheets worksheets = sorted( self.worksheets.keys(), key=lambda k: self.worksheets[k], reverse=True, )[:self.ACTIVE_SHEET_COUNT] - playlist_worksheet = None sync_count += 1 @@ -214,9 +212,6 @@ class SheetSync(object): self.sync_row(worksheet, row, events.get(row['id'])) - if playlist_worksheet is not None: - rows = self.sheets.get_rows(self.sheet_id, playlist_worksheet) - self.sync_playlists(rows) except Exception as e: # for HTTPErrors, http response body includes the more detailed error detail = '' @@ -229,12 +224,12 @@ class SheetSync(object): # If we can't re-connect, the program will crash from here, # then restart and wait until it can connect again. self.conn = self.dbmanager.get_conn() - self.wait(sync_start, self.ERROR_RETRY_INTERVAL) + wait(self.stop, sync_start, self.ERROR_RETRY_INTERVAL) else: logging.info("Successful sync of worksheets: {}".format(", ".join(worksheets))) sheets_synced.inc() sheet_sync_duration.observe(monotonic() - sync_start) - self.wait(sync_start, self.RETRY_INTERVAL) + wait(self.stop, sync_start, self.RETRY_INTERVAL) def get_events(self): """Return the entire events table as a map {id: event namedtuple}""" @@ -383,6 +378,49 @@ class SheetSync(object): the most-recently-modified queue.""" self.worksheets[worksheet] = monotonic() + +class PlaylistSync: + + # Time between syncs + RETRY_INTERVAL = 20 + + # Time to wait after getting an error + ERROR_RETRY_INTERVAL = 20 + + def __init__(self, stop, dbmanager, sheets, sheet_id, worksheet): + self.stop = stop + self.dbmanager = dbmanager + self.sheets = sheets + self.sheet_id = sheet_id + self.worksheet = worksheet + + def run(self): + self.conn = self.dbmanager.get_conn() + + while not self.stop.is_set(): + try: + sync_start = monotonic() + rows = self.sheets.get_rows(self.sheet_id, self.worksheet) + self.sync_playlists(rows) + except Exception as e: + # for HTTPErrors, http response body includes the more detailed error + detail = '' + if isinstance(e, HTTPError): + detail = ": {}".format(e.response.content) + logging.exception("Failed to sync{}".format(detail)) + sync_errors.inc() + # To ensure a fresh slate and clear any DB-related errors, get a new conn on error. + # This is heavy-handed but simple and effective. + # If we can't re-connect, the program will crash from here, + # then restart and wait until it can connect again. + self.conn = self.dbmanager.get_conn() + wait(self.stop, sync_start, self.ERROR_RETRY_INTERVAL) + else: + logging.info("Successful sync of playlists") + sheets_synced.inc() + sheet_sync_duration.observe(monotonic() - sync_start) + wait(self.stop, sync_start, self.RETRY_INTERVAL) + def sync_playlists(self, rows): """Parse rows with a valid playlist id and at least one tag, overwriting the entire playlists table""" @@ -395,7 +433,7 @@ class SheetSync(object): show_in_description = "" else: continue - tags = self.column_parsers['tags'](tags) + tags = [tag.strip() for tag in tags.split(',') if tag.strip()] if not tags: continue # special-case for the "all everything" list, @@ -484,11 +522,26 @@ def main(dbconnect, sheets_creds_file, edit_url, bustime_start, sheet_id, worksh sheets_creds = json.load(open(sheets_creds_file)) sheets = Sheets( - client_id=sheets_creds['client_id'], - client_secret=sheets_creds['client_secret'], - refresh_token=sheets_creds['refresh_token'], + client_id=sheets_creds['client_id'], + client_secret=sheets_creds['client_secret'], + refresh_token=sheets_creds['refresh_token'], ) - SheetSync(stop, dbmanager, sheets, sheet_id, worksheet_names, edit_url, bustime_start, allocate_ids, playlist_worksheet).run() + workers = [ + SheetSync(stop, dbmanager, sheets, sheet_id, worksheet_names, edit_url, bustime_start, allocate_ids), + ] + if playlist_worksheet: + workers.append(PlaylistSync(stop, dbmanager, sheets, sheet_id, playlist_worksheet)) + + jobs = [gevent.spawn(worker.run) for worker in workers] + # Block until any one exits + gevent.wait(jobs, count=1) + # Stop the others if they aren't stopping already + stop.set() + # Block until all have exited + gevent.wait(jobs) + # Call get() for each one to re-raise if any errored + for job in jobs: + job.get() logging.info("Gracefully stopped")