@ -11,7 +11,7 @@ import gevent.event
import prometheus_client as prom
import prometheus_client as prom
from monotonic import monotonic
from monotonic import monotonic
from psycopg2 import sql
from psycopg2 import sql
from psycopg2 . extras import register_uuid
from psycopg2 . extras import register_uuid , execute_values
from requests import HTTPError
from requests import HTTPError
import common
import common
@ -67,9 +67,10 @@ class SheetSync(object):
# Expected quota usage per 100s =
# Expected quota usage per 100s =
# (100 / RETRY_INTERVAL) * ACTIVE_SHEET_COUNT
# (100 / RETRY_INTERVAL) * ACTIVE_SHEET_COUNT
# + (100 / RETRY_INTERVAL / SYNCS_PER_INACTIVE_CHECK) * (len(worksheets) - ACTIVE_SHEET_COUNT)
# + (100 / RETRY_INTERVAL / SYNCS_PER_INACTIVE_CHECK) * (len(worksheets) - ACTIVE_SHEET_COUNT)
# For current values, this is 100/5 * 2 + 100/5/4 * 6 = 70
# If playlist_worksheet is defined, add 1 to len(worksheets).
# For current values, this is 100/5 * 2 + 100/5/4 * 7 = 75
def __init__ ( self , stop , dbmanager , sheets , sheet_id , worksheets , edit_url , bustime_start , allocate_ids = False ):
def __init__ ( self , stop , dbmanager , sheets , sheet_id , worksheets , edit_url , bustime_start , allocate_ids = False , playlist_worksheet = None ):
self . stop = stop
self . stop = stop
self . dbmanager = dbmanager
self . dbmanager = dbmanager
self . sheets = sheets
self . sheets = sheets
@ -79,6 +80,8 @@ class SheetSync(object):
self . edit_url = edit_url
self . edit_url = edit_url
self . bustime_start = bustime_start
self . bustime_start = bustime_start
self . allocate_ids = allocate_ids
self . allocate_ids = allocate_ids
# The playlist worksheet is checked on the same cadence as inactive sheets
self . playlist_worksheet = playlist_worksheet
# Maps DB column names (or general identifier, for non-DB columns) to sheet column indexes.
# Maps DB column names (or general identifier, for non-DB columns) to sheet column indexes.
# Hard-coded for now, future work: determine this from column headers in sheet
# Hard-coded for now, future work: determine this from column headers in sheet
self . column_map = {
self . column_map = {
@ -159,11 +162,13 @@ class SheetSync(object):
if sync_count % self . SYNCS_PER_INACTIVE_CHECK == 0 :
if sync_count % self . SYNCS_PER_INACTIVE_CHECK == 0 :
# check all worksheets
# check all worksheets
worksheets = self . worksheets
worksheets = self . worksheets
playlist_worksheet = self . playlist_worksheet
else :
else :
# only check most recently changed worksheets
# only check most recently changed worksheets
worksheets = sorted (
worksheets = sorted (
self . worksheets . keys ( ) , key = lambda k : self . worksheets [ k ] , reverse = True ,
self . worksheets . keys ( ) , key = lambda k : self . worksheets [ k ] , reverse = True ,
) [ : self . ACTIVE_SHEET_COUNT ]
) [ : self . ACTIVE_SHEET_COUNT ]
playlist_worksheet = None
sync_count + = 1
sync_count + = 1
sync_start = monotonic ( )
sync_start = monotonic ( )
@ -178,6 +183,10 @@ class SheetSync(object):
continue
continue
row = self . parse_row ( worksheet , row )
row = self . parse_row ( worksheet , row )
self . sync_row ( worksheet , row_index , row , events . get ( row [ ' id ' ] ) )
self . sync_row ( worksheet , row_index , row , events . get ( row [ ' id ' ] ) )
if playlist_worksheet is not None :
rows = self . sheets . get_rows ( self . sheet_id , playlist_worksheet )
self . sync_playlists ( rows )
except Exception as e :
except Exception as e :
# for HTTPErrors, http response body includes the more detailed error
# for HTTPErrors, http response body includes the more detailed error
detail = ' '
detail = ' '
@ -344,6 +353,30 @@ class SheetSync(object):
the most - recently - modified queue . """
the most - recently - modified queue . """
self . worksheets [ worksheet ] = monotonic ( )
self . worksheets [ worksheet ] = monotonic ( )
def sync_playlists ( self , rows ) :
""" Parse rows with a valid playlist id and at least one tag,
overwriting the entire playlists table """
playlists = [ ]
for row in rows :
if len ( row ) != 3 :
continue
tags , _ , playlist_id = row
tags = self . column_parsers [ ' tags ' ] ( tags )
if not tags :
continue
playlist_id = playlist_id . strip ( )
if len ( playlist_id ) != 34 or not playlist_id . startswith ( ' PL ' ) :
continue
playlists . append ( ( tags , playlist_id ) )
# We want to wipe and replace all the current entries in the table.
# The easiest way to do this is a DELETE then an INSERT, all within a transaction.
# The "with" block will perform everything under it within a transaction, rolling back
# on error or committing on exit.
logging . info ( " Updating playlists table with {} playlists " . format ( len ( playlists ) ) )
with self . conn :
query ( self . conn , " DELETE FROM playlists " )
execute_values ( self . conn . cursor ( ) , " INSERT INTO playlists(tags, playlist_id) VALUES %s " , playlists )
@argh.arg ( ' dbconnect ' , help =
@argh.arg ( ' dbconnect ' , help =
" dbconnect should be a postgres connection string, which is either a space-separated "
" dbconnect should be a postgres connection string, which is either a space-separated "
@ -369,7 +402,10 @@ class SheetSync(object):
" --allocate-ids means that it will give rows without ids an id. "
" --allocate-ids means that it will give rows without ids an id. "
" Only one sheet sync should have --allocate-ids on for a given sheet at once! "
" Only one sheet sync should have --allocate-ids on for a given sheet at once! "
)
)
def main ( dbconnect , sheets_creds_file , edit_url , bustime_start , sheet_id , worksheet_names , metrics_port = 8005 , backdoor_port = 0 , allocate_ids = False ) :
@argh.arg ( ' --playlist-worksheet ' , help =
" An optional additional worksheet name that holds playlist tag definitions " ,
)
def main ( dbconnect , sheets_creds_file , edit_url , bustime_start , sheet_id , worksheet_names , metrics_port = 8005 , backdoor_port = 0 , allocate_ids = False , playlist_worksheet = None ) :
"""
"""
Sheet sync constantly scans a Google Sheets sheet and a database , copying inputs from the sheet
Sheet sync constantly scans a Google Sheets sheet and a database , copying inputs from the sheet
to the DB and outputs from the DB to the sheet .
to the DB and outputs from the DB to the sheet .
@ -414,6 +450,6 @@ def main(dbconnect, sheets_creds_file, edit_url, bustime_start, sheet_id, worksh
refresh_token = sheets_creds [ ' refresh_token ' ] ,
refresh_token = sheets_creds [ ' refresh_token ' ] ,
)
)
SheetSync ( stop , dbmanager , sheets , sheet_id , worksheet_names , edit_url , bustime_start , allocate_ids ). run ( )
SheetSync ( stop , dbmanager , sheets , sheet_id , worksheet_names , edit_url , bustime_start , allocate_ids , playlist_worksheet ). run ( )
logging . info ( " Gracefully stopped " )
logging . info ( " Gracefully stopped " )