|
|
|
@ -10,7 +10,6 @@ import gevent.event
|
|
|
|
|
import prometheus_client as prom
|
|
|
|
|
from monotonic import monotonic
|
|
|
|
|
from psycopg2 import sql
|
|
|
|
|
from psycopg2.extras import execute_values
|
|
|
|
|
from requests import HTTPError
|
|
|
|
|
|
|
|
|
|
import common
|
|
|
|
@ -18,7 +17,7 @@ import common.dateutil
|
|
|
|
|
from common.database import DBManager, query, get_column_placeholder
|
|
|
|
|
from common.sheets import Sheets as SheetsClient
|
|
|
|
|
|
|
|
|
|
from .sheets import SheetsEventsMiddleware
|
|
|
|
|
from .sheets import SheetsEventsMiddleware, SheetsPlaylistsMiddleware
|
|
|
|
|
from .streamlog import StreamLogClient, StreamLogMiddleware
|
|
|
|
|
|
|
|
|
|
sheets_synced = prom.Counter(
|
|
|
|
@ -301,80 +300,20 @@ class EventsSync(SheetSync):
|
|
|
|
|
|
|
|
|
|
super().sync_row(sheet_row, db_row)
|
|
|
|
|
|
|
|
|
|
class PlaylistSync:
|
|
|
|
|
|
|
|
|
|
# Time between syncs
|
|
|
|
|
RETRY_INTERVAL = 20
|
|
|
|
|
|
|
|
|
|
# Time to wait after getting an error
|
|
|
|
|
ERROR_RETRY_INTERVAL = 20
|
|
|
|
|
class PlaylistsSync(SheetSync):
|
|
|
|
|
|
|
|
|
|
def __init__(self, stop, dbmanager, sheets, sheet_id, worksheet):
|
|
|
|
|
self.stop = stop
|
|
|
|
|
self.dbmanager = dbmanager
|
|
|
|
|
self.sheets = sheets
|
|
|
|
|
self.sheet_id = sheet_id
|
|
|
|
|
self.worksheet = worksheet
|
|
|
|
|
# Slower poll rate than events to avoid using large amounts of quota
|
|
|
|
|
retry_interval = 20
|
|
|
|
|
error_retry_interval = 20
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
self.conn = self.dbmanager.get_conn()
|
|
|
|
|
|
|
|
|
|
while not self.stop.is_set():
|
|
|
|
|
try:
|
|
|
|
|
sync_start = monotonic()
|
|
|
|
|
rows = self.sheets.get_rows(self.sheet_id, self.worksheet)
|
|
|
|
|
self.sync_playlists(rows)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
# for HTTPErrors, http response body includes the more detailed error
|
|
|
|
|
detail = ''
|
|
|
|
|
if isinstance(e, HTTPError):
|
|
|
|
|
detail = ": {}".format(e.response.content)
|
|
|
|
|
logging.exception("Failed to sync{}".format(detail))
|
|
|
|
|
sync_errors.labels("playlists").inc()
|
|
|
|
|
# To ensure a fresh slate and clear any DB-related errors, get a new conn on error.
|
|
|
|
|
# This is heavy-handed but simple and effective.
|
|
|
|
|
# If we can't re-connect, the program will crash from here,
|
|
|
|
|
# then restart and wait until it can connect again.
|
|
|
|
|
self.conn = self.dbmanager.get_conn()
|
|
|
|
|
wait(self.stop, sync_start, self.ERROR_RETRY_INTERVAL)
|
|
|
|
|
else:
|
|
|
|
|
logging.info("Successful sync of playlists")
|
|
|
|
|
sheets_synced.labels("playlists").inc()
|
|
|
|
|
sheet_sync_duration.labels("playlists").observe(monotonic() - sync_start)
|
|
|
|
|
wait(self.stop, sync_start, self.RETRY_INTERVAL)
|
|
|
|
|
|
|
|
|
|
def sync_playlists(self, rows):
|
|
|
|
|
"""Parse rows with a valid playlist id and at least one tag,
|
|
|
|
|
overwriting the entire playlists table"""
|
|
|
|
|
playlists = []
|
|
|
|
|
for row in rows:
|
|
|
|
|
if len(row) == 5:
|
|
|
|
|
tags, _, name, playlist_id, show_in_description = row
|
|
|
|
|
elif len(row) == 4:
|
|
|
|
|
tags, _, name, playlist_id = row
|
|
|
|
|
show_in_description = ""
|
|
|
|
|
else:
|
|
|
|
|
continue
|
|
|
|
|
tags = [tag.strip() for tag in tags.split(',') if tag.strip()]
|
|
|
|
|
if not tags:
|
|
|
|
|
continue
|
|
|
|
|
# special-case for the "all everything" list,
|
|
|
|
|
# we don't want "no tags" to mean "all videos" so we need a sentinel value.
|
|
|
|
|
if tags == ["<all>"]:
|
|
|
|
|
tags = []
|
|
|
|
|
playlist_id = playlist_id.strip()
|
|
|
|
|
if len(playlist_id) != 34 or not playlist_id.startswith('PL'):
|
|
|
|
|
continue
|
|
|
|
|
show_in_description = show_in_description == "[✓]"
|
|
|
|
|
playlists.append((playlist_id, tags, name, show_in_description))
|
|
|
|
|
# We want to wipe and replace all the current entries in the table.
|
|
|
|
|
# The easiest way to do this is a DELETE then an INSERT, all within a transaction.
|
|
|
|
|
# The "with" block will perform everything under it within a transaction, rolling back
|
|
|
|
|
# on error or committing on exit.
|
|
|
|
|
logging.info("Updating playlists table with {} playlists".format(len(playlists)))
|
|
|
|
|
with self.conn:
|
|
|
|
|
query(self.conn, "DELETE FROM playlists")
|
|
|
|
|
execute_values(self.conn.cursor(), "INSERT INTO playlists(playlist_id, tags, name, show_in_description) VALUES %s", playlists)
|
|
|
|
|
table = "playlists"
|
|
|
|
|
input_columns = {
|
|
|
|
|
"playlist_id",
|
|
|
|
|
"tags",
|
|
|
|
|
"name",
|
|
|
|
|
"show_in_description",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@argh.arg('dbconnect', help=
|
|
|
|
@ -460,8 +399,14 @@ def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0):
|
|
|
|
|
config.get("allocate_ids", False),
|
|
|
|
|
)
|
|
|
|
|
if "playlist_worksheet" in config:
|
|
|
|
|
middleware = SheetsPlaylistsMiddleware(
|
|
|
|
|
client,
|
|
|
|
|
config["sheet_id"],
|
|
|
|
|
[config["playlist_worksheet"]],
|
|
|
|
|
config.get("allocate_ids", False),
|
|
|
|
|
)
|
|
|
|
|
workers.append(
|
|
|
|
|
PlaylistSync(stop, dbmanager, client, config["sheet_id"], config["playlist_worksheet"])
|
|
|
|
|
PlaylistsSync("playlists", middleware, stop, dbmanager, config.get("reverse_sync", False))
|
|
|
|
|
)
|
|
|
|
|
elif config["type"] == "streamlog":
|
|
|
|
|
auth_token = open(config["creds"]).read().strip()
|
|
|
|
|