diff --git a/sheetsync/sheetsync/main.py b/sheetsync/sheetsync/main.py index 5dabe5f..e018f67 100644 --- a/sheetsync/sheetsync/main.py +++ b/sheetsync/sheetsync/main.py @@ -77,6 +77,7 @@ class SheetSync(object): self.create_missing_ids = False # List of input columns self.input_columns = [ + 'sheet_name', 'event_start', 'event_end', 'category', @@ -123,7 +124,7 @@ class SheetSync(object): self.sync_row(row, events.get(row['id'])) for event in [e for id, e in events.items() if id not in seen]: - self.sync_row(event["sheet_name"], None, event) + self.sync_row(None, event) except Exception as e: # for HTTPErrors, http response body includes the more detailed error @@ -181,7 +182,7 @@ class SheetSync(object): logging.info("Inserting new event {}".format(row['id'])) # Insertion conflict just means that another sheet sync beat us to the insert. # We can ignore it. - insert_cols = ['id', 'sheet_name'] + self.input_columns + insert_cols = ['id'] + self.input_columns built_query = sql.SQL(""" INSERT INTO events ({}) VALUES ({}) @@ -190,7 +191,7 @@ class SheetSync(object): sql.SQL(", ").join(sql.Identifier(col) for col in insert_cols), sql.SQL(", ").join(get_column_placeholder(col) for col in insert_cols), ) - query(self.conn, built_query, sheet_name=worksheet, **row) + query(self.conn, built_query, **row) rows_found.labels(worksheet).inc() rows_changed.labels('insert', worksheet).inc() self.middleware.mark_modified(worksheet) @@ -407,7 +408,7 @@ def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0): config["worksheets"], common.dateutil.parse(config["bustime_start"]), config["edit_url"], - config["allocate_ids"], + config.get("allocate_ids", False), ) if "playlist_worksheet" in config: workers.append( @@ -424,7 +425,7 @@ def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0): else: raise ValueError("Unknown type {!r}".format(config["type"])) workers.append( - SheetSync(middleware, stop, dbmanager, config["reverse_sync"]), + SheetSync(middleware, stop, dbmanager, config.get("reverse_sync", False)), ) jobs = [gevent.spawn(worker.run) for worker in workers] diff --git a/sheetsync/sheetsync/streamlog.py b/sheetsync/sheetsync/streamlog.py index 0b84695..b655401 100644 --- a/sheetsync/sheetsync/streamlog.py +++ b/sheetsync/sheetsync/streamlog.py @@ -1,5 +1,6 @@ import json +import logging import requests @@ -19,11 +20,14 @@ class StreamLogClient(): return self.request("GET", "event", self.event_id, "log") def write_value(self, row_id, key, value): - """Write key=value for the given row""" - return self.request("POST", "entry", row_id, key, body=value) + """Write key=value for the given row, or delete if value=None""" + if value is None: + return self.request("DELETE", "entry", row_id, key) + else: + return self.request("POST", "entry", row_id, key, body=value) def request(self, method, *path, body=None): - response = self.session.request(method, "/".join(("api", "v1") + path), + response = self.session.request(method, "/".join((self.url, "api", "v1") + path), data=body, headers={ "Authorization": self.auth_token, @@ -47,12 +51,11 @@ class StreamLogMiddleware: 'description': 'description', 'submitter_winner': 'submitter_or_winner', 'poster_moment': 'poster_moment', - 'image_links': 'media_link', + 'image_links': 'media_links', 'notes': 'notes_to_editor', 'tags': 'tags', 'video_link': 'video_link', 'state': 'video_state', - 'edit_link': 'editor_link', 'error': 'video_errors', 'id': 'id', } @@ -60,31 +63,47 @@ class StreamLogMiddleware: # Omitted columns act as the identity function. self.column_decode = { 'event_start': parse_utc_only, - 'event_end': lambda v: None if v["type"] == "NoTime" else parse_utc_only(v["time"]), + # New, to switch to. + # 'event_end': lambda v: parse_utc_only(v["time"]) if v["type"] == "Time" else None, + # Old + 'event_end': lambda v: None if v is None else parse_utc_only(v), 'category': lambda v: v["name"], - 'image_links': lambda v: [link.strip() for link in v.split()] if v.strip() else [], - 'state': lambda v: v.upper(), + 'state': lambda v: None if v is None else v.upper(), + 'video_link': lambda v: '' if v is None else v, } # Maps DB column names to an encode function to convert from internal format to streamlog. # Omitted columns act as the identity function. self.column_encode = { 'state': lambda v: v[0].upper() + v[1:].lower(), # Titlecase } + # Maps DB column names to the url part you need to write to to set it. + self.write_map = { + "state": "video_state", + "error": "video_errors", + "video_link": "video", + } def get_rows(self): for row in self.client.get_rows()["event_log"]: - yield self.parse_row(row) + row = self.parse_row(row) + # Malformed rows can be skipped, represented as a None result + if row is not None: + yield row def parse_row(self, row): output = {} for column, key in self.column_map.items(): value = row[key] if column in self.column_decode: - value = self.column_decode[column](value) + try: + value = self.column_decode[column](value) + except Exception: + logging.exception(f"Failed to parse {key} value {value!r} of row {row['id']}, skipping") + return output[column] = value # Section name is sheet name - output["sheet_name"] = row["section"]["name"] + output["sheet_name"] = row["section"]["name"] if row["section"] else "unknown" # Implicit tags output['tags'] += [ @@ -99,7 +118,7 @@ class StreamLogMiddleware: def write_value(self, row, key, value): if key in self.column_encode: value = self.column_encode[key](value) - self.client.write_value(row["id"], key, value) + self.client.write_value(row["id"], self.write_map[key], value) def mark_modified(self, row): pass # not a concept we have