|
|
|
@ -53,6 +53,15 @@ event_counts = prom.Gauge(
|
|
|
|
|
['sheet_name', 'category', 'poster_moment', 'state', 'errored'],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def wait(event, base, interval):
|
|
|
|
|
"""Wait until INTERVAL seconds after BASE, or until event is set."""
|
|
|
|
|
now = monotonic()
|
|
|
|
|
to_wait = base + common.jitter(interval) - now
|
|
|
|
|
if to_wait > 0:
|
|
|
|
|
event.wait(to_wait)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SheetSync(object):
|
|
|
|
|
|
|
|
|
|
# Time between syncs
|
|
|
|
@ -75,7 +84,7 @@ class SheetSync(object):
|
|
|
|
|
# If playlist_worksheet is defined, add 1 to len(worksheets).
|
|
|
|
|
# For current values, this is 100/5 * 2 + 100/5/4 * 7 = 75
|
|
|
|
|
|
|
|
|
|
def __init__(self, stop, dbmanager, sheets, sheet_id, worksheets, edit_url, bustime_start, allocate_ids=False, playlist_worksheet=None):
|
|
|
|
|
def __init__(self, stop, dbmanager, sheets, sheet_id, worksheets, edit_url, bustime_start, allocate_ids=False):
|
|
|
|
|
self.stop = stop
|
|
|
|
|
self.dbmanager = dbmanager
|
|
|
|
|
self.sheets = sheets
|
|
|
|
@ -85,8 +94,6 @@ class SheetSync(object):
|
|
|
|
|
self.edit_url = edit_url
|
|
|
|
|
self.bustime_start = bustime_start
|
|
|
|
|
self.allocate_ids = allocate_ids
|
|
|
|
|
# The playlist worksheet is checked on the same cadence as inactive sheets
|
|
|
|
|
self.playlist_worksheet = playlist_worksheet
|
|
|
|
|
# Maps DB column names (or general identifier, for non-DB columns) to sheet column indexes.
|
|
|
|
|
# Hard-coded for now, future work: determine this from column headers in sheet
|
|
|
|
|
self.column_map = {
|
|
|
|
@ -149,13 +156,6 @@ class SheetSync(object):
|
|
|
|
|
bustime = common.parse_bustime(value)
|
|
|
|
|
return common.bustime_to_dt(self.bustime_start, bustime)
|
|
|
|
|
|
|
|
|
|
def wait(self, base, interval):
|
|
|
|
|
"""Wait until INTERVAL seconds after BASE."""
|
|
|
|
|
now = monotonic()
|
|
|
|
|
to_wait = base + common.jitter(interval) - now
|
|
|
|
|
if to_wait > 0:
|
|
|
|
|
self.stop.wait(to_wait)
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
self.conn = self.dbmanager.get_conn()
|
|
|
|
|
|
|
|
|
@ -173,13 +173,11 @@ class SheetSync(object):
|
|
|
|
|
if sync_count % self.SYNCS_PER_INACTIVE_CHECK == 0:
|
|
|
|
|
# check all worksheets
|
|
|
|
|
worksheets = self.worksheets
|
|
|
|
|
playlist_worksheet = self.playlist_worksheet
|
|
|
|
|
else:
|
|
|
|
|
# only check most recently changed worksheets
|
|
|
|
|
worksheets = sorted(
|
|
|
|
|
self.worksheets.keys(), key=lambda k: self.worksheets[k], reverse=True,
|
|
|
|
|
)[:self.ACTIVE_SHEET_COUNT]
|
|
|
|
|
playlist_worksheet = None
|
|
|
|
|
|
|
|
|
|
sync_count += 1
|
|
|
|
|
|
|
|
|
@ -214,9 +212,6 @@ class SheetSync(object):
|
|
|
|
|
|
|
|
|
|
self.sync_row(worksheet, row, events.get(row['id']))
|
|
|
|
|
|
|
|
|
|
if playlist_worksheet is not None:
|
|
|
|
|
rows = self.sheets.get_rows(self.sheet_id, playlist_worksheet)
|
|
|
|
|
self.sync_playlists(rows)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
# for HTTPErrors, http response body includes the more detailed error
|
|
|
|
|
detail = ''
|
|
|
|
@ -229,12 +224,12 @@ class SheetSync(object):
|
|
|
|
|
# If we can't re-connect, the program will crash from here,
|
|
|
|
|
# then restart and wait until it can connect again.
|
|
|
|
|
self.conn = self.dbmanager.get_conn()
|
|
|
|
|
self.wait(sync_start, self.ERROR_RETRY_INTERVAL)
|
|
|
|
|
wait(self.stop, sync_start, self.ERROR_RETRY_INTERVAL)
|
|
|
|
|
else:
|
|
|
|
|
logging.info("Successful sync of worksheets: {}".format(", ".join(worksheets)))
|
|
|
|
|
sheets_synced.inc()
|
|
|
|
|
sheet_sync_duration.observe(monotonic() - sync_start)
|
|
|
|
|
self.wait(sync_start, self.RETRY_INTERVAL)
|
|
|
|
|
wait(self.stop, sync_start, self.RETRY_INTERVAL)
|
|
|
|
|
|
|
|
|
|
def get_events(self):
|
|
|
|
|
"""Return the entire events table as a map {id: event namedtuple}"""
|
|
|
|
@ -383,6 +378,49 @@ class SheetSync(object):
|
|
|
|
|
the most-recently-modified queue."""
|
|
|
|
|
self.worksheets[worksheet] = monotonic()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PlaylistSync:
|
|
|
|
|
|
|
|
|
|
# Time between syncs
|
|
|
|
|
RETRY_INTERVAL = 20
|
|
|
|
|
|
|
|
|
|
# Time to wait after getting an error
|
|
|
|
|
ERROR_RETRY_INTERVAL = 20
|
|
|
|
|
|
|
|
|
|
def __init__(self, stop, dbmanager, sheets, sheet_id, worksheet):
|
|
|
|
|
self.stop = stop
|
|
|
|
|
self.dbmanager = dbmanager
|
|
|
|
|
self.sheets = sheets
|
|
|
|
|
self.sheet_id = sheet_id
|
|
|
|
|
self.worksheet = worksheet
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
self.conn = self.dbmanager.get_conn()
|
|
|
|
|
|
|
|
|
|
while not self.stop.is_set():
|
|
|
|
|
try:
|
|
|
|
|
sync_start = monotonic()
|
|
|
|
|
rows = self.sheets.get_rows(self.sheet_id, self.worksheet)
|
|
|
|
|
self.sync_playlists(rows)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
# for HTTPErrors, http response body includes the more detailed error
|
|
|
|
|
detail = ''
|
|
|
|
|
if isinstance(e, HTTPError):
|
|
|
|
|
detail = ": {}".format(e.response.content)
|
|
|
|
|
logging.exception("Failed to sync{}".format(detail))
|
|
|
|
|
sync_errors.inc()
|
|
|
|
|
# To ensure a fresh slate and clear any DB-related errors, get a new conn on error.
|
|
|
|
|
# This is heavy-handed but simple and effective.
|
|
|
|
|
# If we can't re-connect, the program will crash from here,
|
|
|
|
|
# then restart and wait until it can connect again.
|
|
|
|
|
self.conn = self.dbmanager.get_conn()
|
|
|
|
|
wait(self.stop, sync_start, self.ERROR_RETRY_INTERVAL)
|
|
|
|
|
else:
|
|
|
|
|
logging.info("Successful sync of playlists")
|
|
|
|
|
sheets_synced.inc()
|
|
|
|
|
sheet_sync_duration.observe(monotonic() - sync_start)
|
|
|
|
|
wait(self.stop, sync_start, self.RETRY_INTERVAL)
|
|
|
|
|
|
|
|
|
|
def sync_playlists(self, rows):
|
|
|
|
|
"""Parse rows with a valid playlist id and at least one tag,
|
|
|
|
|
overwriting the entire playlists table"""
|
|
|
|
@ -395,7 +433,7 @@ class SheetSync(object):
|
|
|
|
|
show_in_description = ""
|
|
|
|
|
else:
|
|
|
|
|
continue
|
|
|
|
|
tags = self.column_parsers['tags'](tags)
|
|
|
|
|
tags = [tag.strip() for tag in tags.split(',') if tag.strip()]
|
|
|
|
|
if not tags:
|
|
|
|
|
continue
|
|
|
|
|
# special-case for the "all everything" list,
|
|
|
|
@ -484,11 +522,26 @@ def main(dbconnect, sheets_creds_file, edit_url, bustime_start, sheet_id, worksh
|
|
|
|
|
sheets_creds = json.load(open(sheets_creds_file))
|
|
|
|
|
|
|
|
|
|
sheets = Sheets(
|
|
|
|
|
client_id=sheets_creds['client_id'],
|
|
|
|
|
client_secret=sheets_creds['client_secret'],
|
|
|
|
|
refresh_token=sheets_creds['refresh_token'],
|
|
|
|
|
client_id=sheets_creds['client_id'],
|
|
|
|
|
client_secret=sheets_creds['client_secret'],
|
|
|
|
|
refresh_token=sheets_creds['refresh_token'],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
SheetSync(stop, dbmanager, sheets, sheet_id, worksheet_names, edit_url, bustime_start, allocate_ids, playlist_worksheet).run()
|
|
|
|
|
workers = [
|
|
|
|
|
SheetSync(stop, dbmanager, sheets, sheet_id, worksheet_names, edit_url, bustime_start, allocate_ids),
|
|
|
|
|
]
|
|
|
|
|
if playlist_worksheet:
|
|
|
|
|
workers.append(PlaylistSync(stop, dbmanager, sheets, sheet_id, playlist_worksheet))
|
|
|
|
|
|
|
|
|
|
jobs = [gevent.spawn(worker.run) for worker in workers]
|
|
|
|
|
# Block until any one exits
|
|
|
|
|
gevent.wait(jobs, count=1)
|
|
|
|
|
# Stop the others if they aren't stopping already
|
|
|
|
|
stop.set()
|
|
|
|
|
# Block until all have exited
|
|
|
|
|
gevent.wait(jobs)
|
|
|
|
|
# Call get() for each one to re-raise if any errored
|
|
|
|
|
for job in jobs:
|
|
|
|
|
job.get()
|
|
|
|
|
|
|
|
|
|
logging.info("Gracefully stopped")
|
|
|
|
|