@ -8,9 +8,10 @@ import argh
import gevent . backdoor
import gevent . backdoor
import gevent . event
import gevent . event
import prometheus_client as prom
import prometheus_client as prom
from requests import HTTPError
from monotonic import monotonic
from psycopg2 import sql
from psycopg2 import sql
from psycopg2 . extras import register_uuid
from psycopg2 . extras import register_uuid
from requests import HTTPError
import common
import common
import common . dateutil
import common . dateutil
@ -44,15 +45,30 @@ class SheetSync(object):
# Time between syncs
# Time between syncs
RETRY_INTERVAL = 5
RETRY_INTERVAL = 5
# Time to wait after getting an error
# Time to wait after getting an error
ERROR_RETRY_INTERVAL = 10
ERROR_RETRY_INTERVAL = 10
# How many syncs of active sheets to do before checking inactive sheets.
# By checking inactive sheets less often, we stay within our API limits.
# For example, 4 syncs per inactive check * 5 seconds between syncs = 20s between inactive checks
SYNCS_PER_INACTIVE_CHECK = 4
# How many worksheets to keep "active" based on most recent modify time
ACTIVE_SHEET_COUNT = 2
# Expected quota usage per 100s =
# (100 / RETRY_INTERVAL) * ACTIVE_SHEET_COUNT
# + (100 / RETRY_INTERVAL / SYNCS_PER_INACTIVE_CHECK) * (len(worksheets) - ACTIVE_SHEET_COUNT)
# For current values, this is 100/5 * 2 + 100/5/4 * 6 = 70
def __init__ ( self , stop , dbmanager , sheets , sheet_id , worksheets , edit_url , bustime_start , allocate_ids = False ) :
def __init__ ( self , stop , dbmanager , sheets , sheet_id , worksheets , edit_url , bustime_start , allocate_ids = False ) :
self . stop = stop
self . stop = stop
self . dbmanager = dbmanager
self . dbmanager = dbmanager
self . sheets = sheets
self . sheets = sheets
self . sheet_id = sheet_id
self . sheet_id = sheet_id
self . worksheets = worksheets
# map {worksheet: last modify time}
self . worksheets = { w : 0 for w in worksheets }
self . edit_url = edit_url
self . edit_url = edit_url
self . bustime_start = bustime_start
self . bustime_start = bustime_start
self . allocate_ids = allocate_ids
self . allocate_ids = allocate_ids
@ -116,6 +132,9 @@ class SheetSync(object):
def run ( self ) :
def run ( self ) :
self . conn = self . dbmanager . get_conn ( )
self . conn = self . dbmanager . get_conn ( )
# tracks when to do inactive checks
sync_count = 0
while not self . stop . is_set ( ) :
while not self . stop . is_set ( ) :
try :
try :
@ -123,7 +142,16 @@ class SheetSync(object):
# each row is more expensive than the cost of just grabbing the entire table
# each row is more expensive than the cost of just grabbing the entire table
# and comparing locally.
# and comparing locally.
events = self . get_events ( )
events = self . get_events ( )
for worksheet in self . worksheets :
if sync_count % self . SYNCS_PER_INACTIVE_CHECK == 0 :
# check all worksheets
worksheets = self . worksheets
else :
# only check most recently changed worksheets
worksheets = sorted (
self . worksheets . keys ( ) , key = lambda k : self . worksheets [ k ] , reverse = True ,
) [ : self . ACTIVE_SHEET_COUNT ]
sync_count + = 1
for worksheet in worksheets :
rows = self . sheets . get_rows ( self . sheet_id , worksheet )
rows = self . sheets . get_rows ( self . sheet_id , worksheet )
for row_index , row in enumerate ( rows ) :
for row_index , row in enumerate ( rows ) :
# Skip first row (ie. the column titles).
# Skip first row (ie. the column titles).
@ -216,6 +244,7 @@ class SheetSync(object):
query ( self . conn , built_query , sheet_name = worksheet , * * row )
query ( self . conn , built_query , sheet_name = worksheet , * * row )
rows_found . labels ( worksheet ) . inc ( )
rows_found . labels ( worksheet ) . inc ( )
rows_changed . labels ( ' insert ' , worksheet ) . inc ( )
rows_changed . labels ( ' insert ' , worksheet ) . inc ( )
self . mark_modified ( worksheet )
return
return
rows_found . labels ( worksheet ) . inc ( )
rows_found . labels ( worksheet ) . inc ( )
@ -237,6 +266,7 @@ class SheetSync(object):
) )
) )
query ( self . conn , built_query , * * row )
query ( self . conn , built_query , * * row )
rows_changed . labels ( ' input ' , worksheet ) . inc ( )
rows_changed . labels ( ' input ' , worksheet ) . inc ( )
self . mark_modified ( worksheet )
# Update sheet with any changed outputs
# Update sheet with any changed outputs
format_output = lambda v : ' ' if v is None else v # cast nulls to empty string
format_output = lambda v : ' ' if v is None else v # cast nulls to empty string
@ -252,6 +282,7 @@ class SheetSync(object):
format_output ( getattr ( event , col ) ) ,
format_output ( getattr ( event , col ) ) ,
)
)
rows_changed . labels ( ' output ' , worksheet ) . inc ( )
rows_changed . labels ( ' output ' , worksheet ) . inc ( )
self . mark_modified ( worksheet )
# Set edit link if marked for editing and start/end set.
# Set edit link if marked for editing and start/end set.
# This prevents accidents / clicking the wrong row and provides
# This prevents accidents / clicking the wrong row and provides
@ -265,6 +296,12 @@ class SheetSync(object):
row_index , self . column_map [ ' edit_link ' ] ,
row_index , self . column_map [ ' edit_link ' ] ,
edit_link ,
edit_link ,
)
)
self . mark_modified ( worksheet )
def mark_modified ( self , worksheet ) :
""" Mark worksheet as having had a change made, bumping it to the top of
the most - recently - modified queue . """
self . worksheets [ worksheet ] = monotonic ( )
@argh.arg ( ' dbconnect ' , help =
@argh.arg ( ' dbconnect ' , help =