wip: fixes

pull/401/head
Mike Lang 1 year ago committed by Mike Lang
parent eebfa5885b
commit 3e873ca5f6

@ -77,6 +77,7 @@ class SheetSync(object):
self.create_missing_ids = False self.create_missing_ids = False
# List of input columns # List of input columns
self.input_columns = [ self.input_columns = [
'sheet_name',
'event_start', 'event_start',
'event_end', 'event_end',
'category', 'category',
@ -123,7 +124,7 @@ class SheetSync(object):
self.sync_row(row, events.get(row['id'])) self.sync_row(row, events.get(row['id']))
for event in [e for id, e in events.items() if id not in seen]: for event in [e for id, e in events.items() if id not in seen]:
self.sync_row(event["sheet_name"], None, event) self.sync_row(None, event)
except Exception as e: except Exception as e:
# for HTTPErrors, http response body includes the more detailed error # for HTTPErrors, http response body includes the more detailed error
@ -181,7 +182,7 @@ class SheetSync(object):
logging.info("Inserting new event {}".format(row['id'])) logging.info("Inserting new event {}".format(row['id']))
# Insertion conflict just means that another sheet sync beat us to the insert. # Insertion conflict just means that another sheet sync beat us to the insert.
# We can ignore it. # We can ignore it.
insert_cols = ['id', 'sheet_name'] + self.input_columns insert_cols = ['id'] + self.input_columns
built_query = sql.SQL(""" built_query = sql.SQL("""
INSERT INTO events ({}) INSERT INTO events ({})
VALUES ({}) VALUES ({})
@ -190,7 +191,7 @@ class SheetSync(object):
sql.SQL(", ").join(sql.Identifier(col) for col in insert_cols), sql.SQL(", ").join(sql.Identifier(col) for col in insert_cols),
sql.SQL(", ").join(get_column_placeholder(col) for col in insert_cols), sql.SQL(", ").join(get_column_placeholder(col) for col in insert_cols),
) )
query(self.conn, built_query, sheet_name=worksheet, **row) query(self.conn, built_query, **row)
rows_found.labels(worksheet).inc() rows_found.labels(worksheet).inc()
rows_changed.labels('insert', worksheet).inc() rows_changed.labels('insert', worksheet).inc()
self.middleware.mark_modified(worksheet) self.middleware.mark_modified(worksheet)
@ -407,7 +408,7 @@ def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0):
config["worksheets"], config["worksheets"],
common.dateutil.parse(config["bustime_start"]), common.dateutil.parse(config["bustime_start"]),
config["edit_url"], config["edit_url"],
config["allocate_ids"], config.get("allocate_ids", False),
) )
if "playlist_worksheet" in config: if "playlist_worksheet" in config:
workers.append( workers.append(
@ -424,7 +425,7 @@ def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0):
else: else:
raise ValueError("Unknown type {!r}".format(config["type"])) raise ValueError("Unknown type {!r}".format(config["type"]))
workers.append( workers.append(
SheetSync(middleware, stop, dbmanager, config["reverse_sync"]), SheetSync(middleware, stop, dbmanager, config.get("reverse_sync", False)),
) )
jobs = [gevent.spawn(worker.run) for worker in workers] jobs = [gevent.spawn(worker.run) for worker in workers]

@ -1,5 +1,6 @@
import json import json
import logging
import requests import requests
@ -19,11 +20,14 @@ class StreamLogClient():
return self.request("GET", "event", self.event_id, "log") return self.request("GET", "event", self.event_id, "log")
def write_value(self, row_id, key, value): def write_value(self, row_id, key, value):
"""Write key=value for the given row""" """Write key=value for the given row, or delete if value=None"""
return self.request("POST", "entry", row_id, key, body=value) if value is None:
return self.request("DELETE", "entry", row_id, key)
else:
return self.request("POST", "entry", row_id, key, body=value)
def request(self, method, *path, body=None): def request(self, method, *path, body=None):
response = self.session.request(method, "/".join(("api", "v1") + path), response = self.session.request(method, "/".join((self.url, "api", "v1") + path),
data=body, data=body,
headers={ headers={
"Authorization": self.auth_token, "Authorization": self.auth_token,
@ -47,12 +51,11 @@ class StreamLogMiddleware:
'description': 'description', 'description': 'description',
'submitter_winner': 'submitter_or_winner', 'submitter_winner': 'submitter_or_winner',
'poster_moment': 'poster_moment', 'poster_moment': 'poster_moment',
'image_links': 'media_link', 'image_links': 'media_links',
'notes': 'notes_to_editor', 'notes': 'notes_to_editor',
'tags': 'tags', 'tags': 'tags',
'video_link': 'video_link', 'video_link': 'video_link',
'state': 'video_state', 'state': 'video_state',
'edit_link': 'editor_link',
'error': 'video_errors', 'error': 'video_errors',
'id': 'id', 'id': 'id',
} }
@ -60,31 +63,47 @@ class StreamLogMiddleware:
# Omitted columns act as the identity function. # Omitted columns act as the identity function.
self.column_decode = { self.column_decode = {
'event_start': parse_utc_only, 'event_start': parse_utc_only,
'event_end': lambda v: None if v["type"] == "NoTime" else parse_utc_only(v["time"]), # New, to switch to.
# 'event_end': lambda v: parse_utc_only(v["time"]) if v["type"] == "Time" else None,
# Old
'event_end': lambda v: None if v is None else parse_utc_only(v),
'category': lambda v: v["name"], 'category': lambda v: v["name"],
'image_links': lambda v: [link.strip() for link in v.split()] if v.strip() else [], 'state': lambda v: None if v is None else v.upper(),
'state': lambda v: v.upper(), 'video_link': lambda v: '' if v is None else v,
} }
# Maps DB column names to an encode function to convert from internal format to streamlog. # Maps DB column names to an encode function to convert from internal format to streamlog.
# Omitted columns act as the identity function. # Omitted columns act as the identity function.
self.column_encode = { self.column_encode = {
'state': lambda v: v[0].upper() + v[1:].lower(), # Titlecase 'state': lambda v: v[0].upper() + v[1:].lower(), # Titlecase
} }
# Maps DB column names to the url part you need to write to to set it.
self.write_map = {
"state": "video_state",
"error": "video_errors",
"video_link": "video",
}
def get_rows(self): def get_rows(self):
for row in self.client.get_rows()["event_log"]: for row in self.client.get_rows()["event_log"]:
yield self.parse_row(row) row = self.parse_row(row)
# Malformed rows can be skipped, represented as a None result
if row is not None:
yield row
def parse_row(self, row): def parse_row(self, row):
output = {} output = {}
for column, key in self.column_map.items(): for column, key in self.column_map.items():
value = row[key] value = row[key]
if column in self.column_decode: if column in self.column_decode:
value = self.column_decode[column](value) try:
value = self.column_decode[column](value)
except Exception:
logging.exception(f"Failed to parse {key} value {value!r} of row {row['id']}, skipping")
return
output[column] = value output[column] = value
# Section name is sheet name # Section name is sheet name
output["sheet_name"] = row["section"]["name"] output["sheet_name"] = row["section"]["name"] if row["section"] else "unknown"
# Implicit tags # Implicit tags
output['tags'] += [ output['tags'] += [
@ -99,7 +118,7 @@ class StreamLogMiddleware:
def write_value(self, row, key, value): def write_value(self, row, key, value):
if key in self.column_encode: if key in self.column_encode:
value = self.column_encode[key](value) value = self.column_encode[key](value)
self.client.write_value(row["id"], key, value) self.client.write_value(row["id"], self.write_map[key], value)
def mark_modified(self, row): def mark_modified(self, row):
pass # not a concept we have pass # not a concept we have

Loading…
Cancel
Save