From 29ff11457e2e2b4c609c30f708ae7cc29a70af28 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 12 Aug 2024 12:17:50 +1000 Subject: [PATCH] sheetsync: Namespace all logs and metrics behind a sheetsync "name" This helps differentiate the multiple syncs we now have and will have: - syncing events from streamlog - reverse syncing events to sheets - syncing playlists --- sheetsync/sheetsync/main.py | 57 ++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/sheetsync/sheetsync/main.py b/sheetsync/sheetsync/main.py index 69db5e8..aa43544 100644 --- a/sheetsync/sheetsync/main.py +++ b/sheetsync/sheetsync/main.py @@ -23,34 +23,37 @@ from .streamlog import StreamLogClient, StreamLogMiddleware sheets_synced = prom.Counter( 'sheets_synced', 'Number of successful sheet syncs', + ['name'], ) sheet_sync_duration = prom.Histogram( 'sheet_sync_duration', 'Time taken to complete a sheet sync', + ['name'], ) sync_errors = prom.Counter( 'sync_errors', 'Number of errors syncing sheets', + ['name'], ) rows_found = prom.Counter( 'rows_found', 'Number of rows that sheetsync looked at with an id', - ['worksheet'], + ['name', 'worksheet'], ) rows_changed = prom.Counter( 'rows_changed', 'Number of rows that needed changes applied, with type=insert, type=input or type=output', - ['type', 'worksheet'], + ['name', 'type', 'worksheet'], ) event_counts = prom.Gauge( 'event_counts', 'Number of rows in the database', - ['sheet_name', 'category', 'poster_moment', 'state', 'errored'], + ['name', 'sheet_name', 'category', 'poster_moment', 'state', 'errored'], ) @@ -70,7 +73,9 @@ class SheetSync(object): # Time to wait after getting an error ERROR_RETRY_INTERVAL = 10 - def __init__(self, middleware, stop, dbmanager, reverse_sync=False): + def __init__(self, name, middleware, stop, dbmanager, reverse_sync=False): + self.name = name + self.logger = logging.getLogger(type(self).__name__).getChild(name) self.middleware = middleware self.stop = stop self.dbmanager = dbmanager @@ -117,7 +122,7 @@ class SheetSync(object): for row in self.middleware.get_rows(): if row['id'] in seen: - logging.error("Duplicate id {}, skipping".format(row['id'])) + self.logger.error("Duplicate id {}, skipping".format(row['id'])) continue seen.add(row['id']) self.sync_row(row, db_rows.get(row['id'])) @@ -130,8 +135,8 @@ class SheetSync(object): detail = '' if isinstance(e, HTTPError): detail = ": {}".format(e.response.content) - logging.exception("Failed to sync{}".format(detail)) - sync_errors.inc() + self.logger.exception("Failed to sync{}".format(detail)) + sync_errors.labels(self.name).inc() # To ensure a fresh slate and clear any DB-related errors, get a new conn on error. # This is heavy-handed but simple and effective. # If we can't re-connect, the program will crash from here, @@ -139,9 +144,9 @@ class SheetSync(object): self.conn = self.dbmanager.get_conn() wait(self.stop, sync_start, self.ERROR_RETRY_INTERVAL) else: - logging.info("Successful sync") - sheets_synced.inc() - sheet_sync_duration.observe(monotonic() - sync_start) + self.logger.info("Successful sync") + sheets_synced.labels(self.name).inc() + sheet_sync_duration.labels(self.name).observe(monotonic() - sync_start) wait(self.stop, sync_start, self.RETRY_INTERVAL) def get_db_rows(self): @@ -165,7 +170,7 @@ class SheetSync(object): # or else any values we don't update will remain as a stale count. event_counts._metrics.clear() for labels, count in counts.items(): - event_counts.labels(*labels).set(count) + event_counts.labels(self.name, *labels).set(count) return by_id def sync_row(self, sheet_row, db_row): @@ -178,7 +183,7 @@ class SheetSync(object): assert sheet_row worksheet = sheet_row["sheet_name"] # No row currently in DB, create it. - logging.info("Inserting new DB row {}".format(sheet_row['id'])) + self.logger.info("Inserting new DB row {}".format(sheet_row['id'])) # Insertion conflict just means that another sheet sync beat us to the insert. # We can ignore it. insert_cols = ['id', 'sheet_name'] + self.input_columns @@ -191,21 +196,21 @@ class SheetSync(object): sql.SQL(", ").join(get_column_placeholder(col) for col in insert_cols), ) query(self.conn, built_query, **sheet_row) - rows_found.labels(worksheet).inc() - rows_changed.labels('insert', worksheet).inc() + rows_found.labels(self.name, worksheet).inc() + rows_changed.labels(self.name, 'insert', worksheet).inc() self.middleware.mark_modified(sheet_row) return if sheet_row is None: assert db_row if not self.create_missing_ids: - logging.info("Skipping db row {} without any matching sheet row".format(db_row.id)) + self.logger.info("Skipping db row {} without any matching sheet row".format(db_row.id)) return - logging.info("Adding new row {}".format(db_row.id)) + self.logger.info("Adding new row {}".format(db_row.id)) sheet_row = self.middleware.create_row(db_row.sheet_name, db_row.id) worksheet = sheet_row["sheet_name"] - rows_found.labels(worksheet).inc() + rows_found.labels(self.name, worksheet).inc() # If no database error, but we have parse errors, indicate they should be displayed. if db_row.error is None and sheet_row.get('_parse_errors'): @@ -220,7 +225,7 @@ class SheetSync(object): # Update database with any changed inputs changed = [col for col in self.input_columns if sheet_row.get(col) != getattr(db_row, col)] if changed: - logging.info("Updating db row {} with new value(s) for {}".format( + self.logger.info("Updating db row {} with new value(s) for {}".format( sheet_row['id'], ', '.join(changed) )) built_query = sql.SQL(""" @@ -233,21 +238,21 @@ class SheetSync(object): ) for col in changed )) query(self.conn, built_query, **sheet_row) - rows_changed.labels('input', worksheet).inc() + rows_changed.labels(self.name, 'input', worksheet).inc() self.middleware.mark_modified(sheet_row) # Update sheet with any changed outputs changed = [col for col in self.output_columns if sheet_row.get(col) != getattr(db_row, col)] if changed: - logging.info("Updating sheet row {} with new value(s) for {}".format( + self.logger.info("Updating sheet row {} with new value(s) for {}".format( sheet_row['id'], ', '.join(changed) )) for col in changed: - logging.debug("Writing to sheet {} {!r} -> {!r}".format(col, sheet_row.get(col), getattr(db_row, col))) + self.logger.debug("Writing to sheet {} {!r} -> {!r}".format(col, sheet_row.get(col), getattr(db_row, col))) self.middleware.write_value( sheet_row, col, getattr(db_row, col), ) - rows_changed.labels('output', worksheet).inc() + rows_changed.labels(self.name, 'output', worksheet).inc() self.middleware.mark_modified(sheet_row) @@ -280,7 +285,7 @@ class PlaylistSync: if isinstance(e, HTTPError): detail = ": {}".format(e.response.content) logging.exception("Failed to sync{}".format(detail)) - sync_errors.inc() + sync_errors.labels("playlists").inc() # To ensure a fresh slate and clear any DB-related errors, get a new conn on error. # This is heavy-handed but simple and effective. # If we can't re-connect, the program will crash from here, @@ -289,8 +294,8 @@ class PlaylistSync: wait(self.stop, sync_start, self.ERROR_RETRY_INTERVAL) else: logging.info("Successful sync of playlists") - sheets_synced.inc() - sheet_sync_duration.observe(monotonic() - sync_start) + sheets_synced.labels("playlists").inc() + sheet_sync_duration.labels("playlists").observe(monotonic() - sync_start) wait(self.stop, sync_start, self.RETRY_INTERVAL) def sync_playlists(self, rows): @@ -424,7 +429,7 @@ def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0): else: raise ValueError("Unknown type {!r}".format(config["type"])) workers.append( - SheetSync(middleware, stop, dbmanager, config.get("reverse_sync", False)), + SheetSync(config["type"], middleware, stop, dbmanager, config.get("reverse_sync", False)), ) jobs = [gevent.spawn(worker.run) for worker in workers]