@ -126,8 +126,12 @@ class SheetSync(object):
bustime = common . parse_bustime ( value )
return common . bustime_to_dt ( self . bustime_start , bustime )
def wait ( self , interval ) :
self . stop . wait ( common . jitter ( interval ) )
def wait ( self , base , interval ) :
""" Wait until INTERVAL seconds after BASE. """
now = monotonic ( )
to_wait = base + common . jitter ( interval ) - now
if to_wait > 0 :
self . stop . wait ( to_wait )
def run ( self ) :
self . conn = self . dbmanager . get_conn ( )
@ -136,7 +140,7 @@ class SheetSync(object):
sync_count = 0
while not self . stop . is_set ( ) :
try :
# Since the full dataset is small, the cost of round tripping to the database to check
# each row is more expensive than the cost of just grabbing the entire table
@ -150,7 +154,10 @@ class SheetSync(object):
worksheets = sorted (
self . worksheets . keys ( ) , key = lambda k : self . worksheets [ k ] , reverse = True ,
) [ : self . ACTIVE_SHEET_COUNT ]
sync_count + = 1
sync_start = monotonic ( )
for worksheet in worksheets :
rows = self . sheets . get_rows ( self . sheet_id , worksheet )
for row_index , row in enumerate ( rows ) :
@ -173,11 +180,11 @@ class SheetSync(object):
# If we can't re-connect, the program will crash from here,
# then restart and wait until it can connect again.
self . conn = self . dbmanager . get_conn ( )
self . wait ( self . ERROR_RETRY_INTERVAL )
self . wait ( sync_start , self . ERROR_RETRY_INTERVAL )
else :
logging . info ( " Successful sync " )
sheets_synced . inc ( )
self . wait ( self . RETRY_INTERVAL )
self . wait ( sync_start , self . RETRY_INTERVAL )
def get_events ( self ) :
""" Return the entire events table as a map { id: event namedtuple} """