|
|
|
@ -112,7 +112,7 @@ class SheetSync(object):
|
|
|
|
|
# Since the full dataset is small, the cost of round tripping to the database to check
|
|
|
|
|
# each row is more expensive than the cost of just grabbing the entire table
|
|
|
|
|
# and comparing locally.
|
|
|
|
|
events = self.get_events()
|
|
|
|
|
db_rows = self.get_db_rows()
|
|
|
|
|
seen = set()
|
|
|
|
|
|
|
|
|
|
for row in self.middleware.get_rows():
|
|
|
|
@ -120,10 +120,10 @@ class SheetSync(object):
|
|
|
|
|
logging.error("Duplicate id {}, skipping".format(row['id']))
|
|
|
|
|
continue
|
|
|
|
|
seen.add(row['id'])
|
|
|
|
|
self.sync_row(row, events.get(row['id']))
|
|
|
|
|
self.sync_row(row, db_rows.get(row['id']))
|
|
|
|
|
|
|
|
|
|
for event in [e for id, e in events.items() if id not in seen]:
|
|
|
|
|
self.sync_row(None, event)
|
|
|
|
|
for db_row in [r for id, r in db_rows.items() if id not in seen]:
|
|
|
|
|
self.sync_row(None, db_row)
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
# for HTTPErrors, http response body includes the more detailed error
|
|
|
|
@ -144,10 +144,10 @@ class SheetSync(object):
|
|
|
|
|
sheet_sync_duration.observe(monotonic() - sync_start)
|
|
|
|
|
wait(self.stop, sync_start, self.RETRY_INTERVAL)
|
|
|
|
|
|
|
|
|
|
def get_events(self):
|
|
|
|
|
"""Return the entire events table as a map {id: event namedtuple}"""
|
|
|
|
|
def get_db_rows(self):
|
|
|
|
|
"""Return the entire events table as a map {id: row namedtuple}"""
|
|
|
|
|
built_query = sql.SQL("""
|
|
|
|
|
SELECT {} FROM EVENTS
|
|
|
|
|
SELECT {} FROM events
|
|
|
|
|
""").format(
|
|
|
|
|
sql.SQL(", ").join(sql.Identifier(col) for col in
|
|
|
|
|
{ "id", "state", "error", "public", "poster_moment", "sheet_name", "category" }
|
|
|
|
@ -168,17 +168,17 @@ class SheetSync(object):
|
|
|
|
|
event_counts.labels(*labels).set(count)
|
|
|
|
|
return by_id
|
|
|
|
|
|
|
|
|
|
def sync_row(self, row, event):
|
|
|
|
|
"""Take a row dict (or None) and an Event from the database (or None)
|
|
|
|
|
def sync_row(self, sheet_row, db_row):
|
|
|
|
|
"""Take a row dict from the sheet (or None) and a row namedtuple from the database (or None)
|
|
|
|
|
and take whatever action is required to sync them, ie. writing to the database or sheet.
|
|
|
|
|
At least one of row and event must be non-None.
|
|
|
|
|
At least one must be non-None.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
if event is None:
|
|
|
|
|
assert row
|
|
|
|
|
worksheet = row["sheet_name"]
|
|
|
|
|
# No event currently in DB, create it.
|
|
|
|
|
logging.info("Inserting new event {}".format(row['id']))
|
|
|
|
|
if db_row is None:
|
|
|
|
|
assert sheet_row
|
|
|
|
|
worksheet = sheet_row["sheet_name"]
|
|
|
|
|
# No row currently in DB, create it.
|
|
|
|
|
logging.info("Inserting new DB row {}".format(sheet_row['id']))
|
|
|
|
|
# Insertion conflict just means that another sheet sync beat us to the insert.
|
|
|
|
|
# We can ignore it.
|
|
|
|
|
insert_cols = ['id', 'sheet_name'] + self.input_columns
|
|
|
|
@ -190,38 +190,38 @@ class SheetSync(object):
|
|
|
|
|
sql.SQL(", ").join(sql.Identifier(col) for col in insert_cols),
|
|
|
|
|
sql.SQL(", ").join(get_column_placeholder(col) for col in insert_cols),
|
|
|
|
|
)
|
|
|
|
|
query(self.conn, built_query, **row)
|
|
|
|
|
query(self.conn, built_query, **sheet_row)
|
|
|
|
|
rows_found.labels(worksheet).inc()
|
|
|
|
|
rows_changed.labels('insert', worksheet).inc()
|
|
|
|
|
self.middleware.mark_modified(row)
|
|
|
|
|
self.middleware.mark_modified(sheet_row)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if row is None:
|
|
|
|
|
assert event
|
|
|
|
|
if sheet_row is None:
|
|
|
|
|
assert db_row
|
|
|
|
|
if not self.create_missing_ids:
|
|
|
|
|
logging.info("Skipping event {} without any matching row".format(event.id))
|
|
|
|
|
logging.info("Skipping db row {} without any matching sheet row".format(db_row.id))
|
|
|
|
|
return
|
|
|
|
|
logging.info("Adding new row {}".format(event.id))
|
|
|
|
|
row = self.middleware.create_row(event.sheet_name, event.id)
|
|
|
|
|
logging.info("Adding new row {}".format(db_row.id))
|
|
|
|
|
sheet_row = self.middleware.create_row(db_row.sheet_name, db_row.id)
|
|
|
|
|
|
|
|
|
|
worksheet = row["sheet_name"]
|
|
|
|
|
worksheet = sheet_row["sheet_name"]
|
|
|
|
|
rows_found.labels(worksheet).inc()
|
|
|
|
|
|
|
|
|
|
# If no database error, but we have parse errors, indicate they should be displayed.
|
|
|
|
|
if event.error is None and row.get('_parse_errors'):
|
|
|
|
|
event = event._replace(error=", ".join(row['_parse_errors']))
|
|
|
|
|
if db_row.error is None and sheet_row.get('_parse_errors'):
|
|
|
|
|
db_row = db_row._replace(error=", ".join(sheet_row['_parse_errors']))
|
|
|
|
|
|
|
|
|
|
# As a presentation detail, we show any row in state DONE with public = False as
|
|
|
|
|
# a virtual state UNLISTED instead, to indicate that it probably still requires other
|
|
|
|
|
# work before being modified to be public = True later.
|
|
|
|
|
if event.state == 'DONE' and not event.public:
|
|
|
|
|
event = event._replace(state='UNLISTED')
|
|
|
|
|
if db_row.state == 'DONE' and not db_row.public:
|
|
|
|
|
db_row = db_row._replace(state='UNLISTED')
|
|
|
|
|
|
|
|
|
|
# Update database with any changed inputs
|
|
|
|
|
changed = [col for col in self.input_columns if row.get(col) != getattr(event, col)]
|
|
|
|
|
changed = [col for col in self.input_columns if sheet_row.get(col) != getattr(db_row, col)]
|
|
|
|
|
if changed:
|
|
|
|
|
logging.info("Updating event {} with new value(s) for {}".format(
|
|
|
|
|
row['id'], ', '.join(changed)
|
|
|
|
|
logging.info("Updating db row {} with new value(s) for {}".format(
|
|
|
|
|
sheet_row['id'], ', '.join(changed)
|
|
|
|
|
))
|
|
|
|
|
built_query = sql.SQL("""
|
|
|
|
|
UPDATE events
|
|
|
|
@ -232,23 +232,23 @@ class SheetSync(object):
|
|
|
|
|
sql.Identifier(col), get_column_placeholder(col)
|
|
|
|
|
) for col in changed
|
|
|
|
|
))
|
|
|
|
|
query(self.conn, built_query, **row)
|
|
|
|
|
query(self.conn, built_query, **sheet_row)
|
|
|
|
|
rows_changed.labels('input', worksheet).inc()
|
|
|
|
|
self.middleware.mark_modified(row)
|
|
|
|
|
self.middleware.mark_modified(sheet_row)
|
|
|
|
|
|
|
|
|
|
# Update sheet with any changed outputs
|
|
|
|
|
changed = [col for col in self.output_columns if row.get(col) != getattr(event, col)]
|
|
|
|
|
changed = [col for col in self.output_columns if sheet_row.get(col) != getattr(db_row, col)]
|
|
|
|
|
if changed:
|
|
|
|
|
logging.info("Updating sheet row {} with new value(s) for {}".format(
|
|
|
|
|
row['id'], ', '.join(changed)
|
|
|
|
|
sheet_row['id'], ', '.join(changed)
|
|
|
|
|
))
|
|
|
|
|
for col in changed:
|
|
|
|
|
logging.debug("Writing to sheet {} {!r} -> {!r}".format(col, row.get(col), getattr(event, col)))
|
|
|
|
|
logging.debug("Writing to sheet {} {!r} -> {!r}".format(col, sheet_row.get(col), getattr(db_row, col)))
|
|
|
|
|
self.middleware.write_value(
|
|
|
|
|
row, col, getattr(event, col),
|
|
|
|
|
sheet_row, col, getattr(db_row, col),
|
|
|
|
|
)
|
|
|
|
|
rows_changed.labels('output', worksheet).inc()
|
|
|
|
|
self.middleware.mark_modified(row)
|
|
|
|
|
self.middleware.mark_modified(sheet_row)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PlaylistSync:
|
|
|
|
|