@ -23,34 +23,37 @@ from .streamlog import StreamLogClient, StreamLogMiddleware
sheets_synced = prom . Counter (
sheets_synced = prom . Counter (
' sheets_synced ' ,
' sheets_synced ' ,
' Number of successful sheet syncs ' ,
' Number of successful sheet syncs ' ,
[ ' name ' ] ,
)
)
sheet_sync_duration = prom . Histogram (
sheet_sync_duration = prom . Histogram (
' sheet_sync_duration ' ,
' sheet_sync_duration ' ,
' Time taken to complete a sheet sync ' ,
' Time taken to complete a sheet sync ' ,
[ ' name ' ] ,
)
)
sync_errors = prom . Counter (
sync_errors = prom . Counter (
' sync_errors ' ,
' sync_errors ' ,
' Number of errors syncing sheets ' ,
' Number of errors syncing sheets ' ,
[ ' name ' ] ,
)
)
rows_found = prom . Counter (
rows_found = prom . Counter (
' rows_found ' ,
' rows_found ' ,
' Number of rows that sheetsync looked at with an id ' ,
' Number of rows that sheetsync looked at with an id ' ,
[ ' worksheet' ] ,
[ ' name' , ' worksheet' ] ,
)
)
rows_changed = prom . Counter (
rows_changed = prom . Counter (
' rows_changed ' ,
' rows_changed ' ,
' Number of rows that needed changes applied, with type=insert, type=input or type=output ' ,
' Number of rows that needed changes applied, with type=insert, type=input or type=output ' ,
[ ' type' , ' worksheet ' ] ,
[ ' name' , ' type' , ' worksheet ' ] ,
)
)
event_counts = prom . Gauge (
event_counts = prom . Gauge (
' event_counts ' ,
' event_counts ' ,
' Number of rows in the database ' ,
' Number of rows in the database ' ,
[ ' sheet_name' , ' category ' , ' poster_moment ' , ' state ' , ' errored ' ] ,
[ ' name' , ' sheet_name' , ' category ' , ' poster_moment ' , ' state ' , ' errored ' ] ,
)
)
@ -70,7 +73,9 @@ class SheetSync(object):
# Time to wait after getting an error
# Time to wait after getting an error
ERROR_RETRY_INTERVAL = 10
ERROR_RETRY_INTERVAL = 10
def __init__ ( self , middleware , stop , dbmanager , reverse_sync = False ) :
def __init__ ( self , name , middleware , stop , dbmanager , reverse_sync = False ) :
self . name = name
self . logger = logging . getLogger ( type ( self ) . __name__ ) . getChild ( name )
self . middleware = middleware
self . middleware = middleware
self . stop = stop
self . stop = stop
self . dbmanager = dbmanager
self . dbmanager = dbmanager
@ -117,7 +122,7 @@ class SheetSync(object):
for row in self . middleware . get_rows ( ) :
for row in self . middleware . get_rows ( ) :
if row [ ' id ' ] in seen :
if row [ ' id ' ] in seen :
logging . error ( " Duplicate id {} , skipping " . format ( row [ ' id ' ] ) )
self . logger . error ( " Duplicate id {} , skipping " . format ( row [ ' id ' ] ) )
continue
continue
seen . add ( row [ ' id ' ] )
seen . add ( row [ ' id ' ] )
self . sync_row ( row , db_rows . get ( row [ ' id ' ] ) )
self . sync_row ( row , db_rows . get ( row [ ' id ' ] ) )
@ -130,8 +135,8 @@ class SheetSync(object):
detail = ' '
detail = ' '
if isinstance ( e , HTTPError ) :
if isinstance ( e , HTTPError ) :
detail = " : {} " . format ( e . response . content )
detail = " : {} " . format ( e . response . content )
logging . exception ( " Failed to sync {} " . format ( detail ) )
self . logger . exception ( " Failed to sync {} " . format ( detail ) )
sync_errors . inc( )
sync_errors . labels( self . name ) . inc( )
# To ensure a fresh slate and clear any DB-related errors, get a new conn on error.
# To ensure a fresh slate and clear any DB-related errors, get a new conn on error.
# This is heavy-handed but simple and effective.
# This is heavy-handed but simple and effective.
# If we can't re-connect, the program will crash from here,
# If we can't re-connect, the program will crash from here,
@ -139,9 +144,9 @@ class SheetSync(object):
self . conn = self . dbmanager . get_conn ( )
self . conn = self . dbmanager . get_conn ( )
wait ( self . stop , sync_start , self . ERROR_RETRY_INTERVAL )
wait ( self . stop , sync_start , self . ERROR_RETRY_INTERVAL )
else :
else :
logging . info ( " Successful sync " )
self . logger . info ( " Successful sync " )
sheets_synced . inc( )
sheets_synced . labels( self . name ) . inc( )
sheet_sync_duration . observe( monotonic ( ) - sync_start )
sheet_sync_duration . labels( self . name ) . observe( monotonic ( ) - sync_start )
wait ( self . stop , sync_start , self . RETRY_INTERVAL )
wait ( self . stop , sync_start , self . RETRY_INTERVAL )
def get_db_rows ( self ) :
def get_db_rows ( self ) :
@ -165,7 +170,7 @@ class SheetSync(object):
# or else any values we don't update will remain as a stale count.
# or else any values we don't update will remain as a stale count.
event_counts . _metrics . clear ( )
event_counts . _metrics . clear ( )
for labels , count in counts . items ( ) :
for labels , count in counts . items ( ) :
event_counts . labels ( * labels ) . set ( count )
event_counts . labels ( self . name , * labels ) . set ( count )
return by_id
return by_id
def sync_row ( self , sheet_row , db_row ) :
def sync_row ( self , sheet_row , db_row ) :
@ -178,7 +183,7 @@ class SheetSync(object):
assert sheet_row
assert sheet_row
worksheet = sheet_row [ " sheet_name " ]
worksheet = sheet_row [ " sheet_name " ]
# No row currently in DB, create it.
# No row currently in DB, create it.
logging . info ( " Inserting new DB row {} " . format ( sheet_row [ ' id ' ] ) )
self . logger . info ( " Inserting new DB row {} " . format ( sheet_row [ ' id ' ] ) )
# Insertion conflict just means that another sheet sync beat us to the insert.
# Insertion conflict just means that another sheet sync beat us to the insert.
# We can ignore it.
# We can ignore it.
insert_cols = [ ' id ' , ' sheet_name ' ] + self . input_columns
insert_cols = [ ' id ' , ' sheet_name ' ] + self . input_columns
@ -191,21 +196,21 @@ class SheetSync(object):
sql . SQL ( " , " ) . join ( get_column_placeholder ( col ) for col in insert_cols ) ,
sql . SQL ( " , " ) . join ( get_column_placeholder ( col ) for col in insert_cols ) ,
)
)
query ( self . conn , built_query , * * sheet_row )
query ( self . conn , built_query , * * sheet_row )
rows_found . labels ( worksheet ) . inc ( )
rows_found . labels ( self . name , worksheet ) . inc ( )
rows_changed . labels ( ' insert ' , worksheet ) . inc ( )
rows_changed . labels ( self . name , ' insert ' , worksheet ) . inc ( )
self . middleware . mark_modified ( sheet_row )
self . middleware . mark_modified ( sheet_row )
return
return
if sheet_row is None :
if sheet_row is None :
assert db_row
assert db_row
if not self . create_missing_ids :
if not self . create_missing_ids :
logging . info ( " Skipping db row {} without any matching sheet row " . format ( db_row . id ) )
self . logger . info ( " Skipping db row {} without any matching sheet row " . format ( db_row . id ) )
return
return
logging . info ( " Adding new row {} " . format ( db_row . id ) )
self . logger . info ( " Adding new row {} " . format ( db_row . id ) )
sheet_row = self . middleware . create_row ( db_row . sheet_name , db_row . id )
sheet_row = self . middleware . create_row ( db_row . sheet_name , db_row . id )
worksheet = sheet_row [ " sheet_name " ]
worksheet = sheet_row [ " sheet_name " ]
rows_found . labels ( worksheet ) . inc ( )
rows_found . labels ( self . name , worksheet ) . inc ( )
# If no database error, but we have parse errors, indicate they should be displayed.
# If no database error, but we have parse errors, indicate they should be displayed.
if db_row . error is None and sheet_row . get ( ' _parse_errors ' ) :
if db_row . error is None and sheet_row . get ( ' _parse_errors ' ) :
@ -220,7 +225,7 @@ class SheetSync(object):
# Update database with any changed inputs
# Update database with any changed inputs
changed = [ col for col in self . input_columns if sheet_row . get ( col ) != getattr ( db_row , col ) ]
changed = [ col for col in self . input_columns if sheet_row . get ( col ) != getattr ( db_row , col ) ]
if changed :
if changed :
logging . info ( " Updating db row {} with new value(s) for {} " . format (
self . logger . info ( " Updating db row {} with new value(s) for {} " . format (
sheet_row [ ' id ' ] , ' , ' . join ( changed )
sheet_row [ ' id ' ] , ' , ' . join ( changed )
) )
) )
built_query = sql . SQL ( """
built_query = sql . SQL ( """
@ -233,21 +238,21 @@ class SheetSync(object):
) for col in changed
) for col in changed
) )
) )
query ( self . conn , built_query , * * sheet_row )
query ( self . conn , built_query , * * sheet_row )
rows_changed . labels ( ' input ' , worksheet ) . inc ( )
rows_changed . labels ( self . name , ' input ' , worksheet ) . inc ( )
self . middleware . mark_modified ( sheet_row )
self . middleware . mark_modified ( sheet_row )
# Update sheet with any changed outputs
# Update sheet with any changed outputs
changed = [ col for col in self . output_columns if sheet_row . get ( col ) != getattr ( db_row , col ) ]
changed = [ col for col in self . output_columns if sheet_row . get ( col ) != getattr ( db_row , col ) ]
if changed :
if changed :
logging . info ( " Updating sheet row {} with new value(s) for {} " . format (
self . logger . info ( " Updating sheet row {} with new value(s) for {} " . format (
sheet_row [ ' id ' ] , ' , ' . join ( changed )
sheet_row [ ' id ' ] , ' , ' . join ( changed )
) )
) )
for col in changed :
for col in changed :
logging . debug ( " Writing to sheet {} {!r} -> {!r} " . format ( col , sheet_row . get ( col ) , getattr ( db_row , col ) ) )
self . logger . debug ( " Writing to sheet {} {!r} -> {!r} " . format ( col , sheet_row . get ( col ) , getattr ( db_row , col ) ) )
self . middleware . write_value (
self . middleware . write_value (
sheet_row , col , getattr ( db_row , col ) ,
sheet_row , col , getattr ( db_row , col ) ,
)
)
rows_changed . labels ( ' output ' , worksheet ) . inc ( )
rows_changed . labels ( self . name , ' output ' , worksheet ) . inc ( )
self . middleware . mark_modified ( sheet_row )
self . middleware . mark_modified ( sheet_row )
@ -280,7 +285,7 @@ class PlaylistSync:
if isinstance ( e , HTTPError ) :
if isinstance ( e , HTTPError ) :
detail = " : {} " . format ( e . response . content )
detail = " : {} " . format ( e . response . content )
logging . exception ( " Failed to sync {} " . format ( detail ) )
logging . exception ( " Failed to sync {} " . format ( detail ) )
sync_errors . inc( )
sync_errors . labels( " playlists " ) . inc( )
# To ensure a fresh slate and clear any DB-related errors, get a new conn on error.
# To ensure a fresh slate and clear any DB-related errors, get a new conn on error.
# This is heavy-handed but simple and effective.
# This is heavy-handed but simple and effective.
# If we can't re-connect, the program will crash from here,
# If we can't re-connect, the program will crash from here,
@ -289,8 +294,8 @@ class PlaylistSync:
wait ( self . stop , sync_start , self . ERROR_RETRY_INTERVAL )
wait ( self . stop , sync_start , self . ERROR_RETRY_INTERVAL )
else :
else :
logging . info ( " Successful sync of playlists " )
logging . info ( " Successful sync of playlists " )
sheets_synced . inc( )
sheets_synced . labels( " playlists " ) . inc( )
sheet_sync_duration . observe( monotonic ( ) - sync_start )
sheet_sync_duration . labels( " playlists " ) . observe( monotonic ( ) - sync_start )
wait ( self . stop , sync_start , self . RETRY_INTERVAL )
wait ( self . stop , sync_start , self . RETRY_INTERVAL )
def sync_playlists ( self , rows ) :
def sync_playlists ( self , rows ) :
@ -424,7 +429,7 @@ def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0):
else :
else :
raise ValueError ( " Unknown type {!r} " . format ( config [ " type " ] ) )
raise ValueError ( " Unknown type {!r} " . format ( config [ " type " ] ) )
workers . append (
workers . append (
SheetSync ( middleware, stop , dbmanager , config . get ( " reverse_sync " , False ) ) ,
SheetSync ( config[ " type " ] , middleware, stop , dbmanager , config . get ( " reverse_sync " , False ) ) ,
)
)
jobs = [ gevent . spawn ( worker . run ) for worker in workers ]
jobs = [ gevent . spawn ( worker . run ) for worker in workers ]