@ -44,6 +44,12 @@ hash_mismatches = prom.Counter(
[ ' remote ' , ' channel ' , ' quality ' , ' hour ' ] ,
)
small_difference_segments = prom . Gauge (
' small_difference_segments ' ,
' Number of segments which were not pulled due to differing from existing segments by only a very small time difference ' ,
[ ' remote ' , ' channel ' , ' quality ' , ' hour ' ] ,
)
node_list_errors = prom . Counter (
' node_list_errors ' ,
' Number of errors fetching a list of nodes ' ,
@ -504,8 +510,20 @@ class BackfillerWorker(object):
# multiple workers request the same segment at the same time
random . shuffle ( missing_segments )
if quality != ' chat ' :
MATCH_FIELDS = ( " channel " , " quality " , " duration " , " type " , " hash " )
EPSILON = 0.001
local_infos = [ ]
for path in local_segments :
path = os . path . join ( channel , quality , hour , path )
try :
local_infos . append ( common . parse_segment_path ( path ) )
except ValueError as e :
self . logger . warning ( ' Local file {} could not be parsed: {} ' . format ( path , e ) )
pool = gevent . pool . Pool ( self . download_concurrency )
workers = [ ]
small_differences = 0
for missing_segment in missing_segments :
@ -543,12 +561,29 @@ class BackfillerWorker(object):
self . logger . debug ( ' Skipping {} as too recent ' . format ( path ) )
continue
# if any local segment is within 1ms of the missing segment and otherwise identical, ignore it
found = None
for local_segment in local_infos :
# if any fields differ, no match
if not all ( getattr ( segment , field ) == getattr ( local_segment , field ) for field in MATCH_FIELDS ) :
continue
# if time difference > epsilon, no match
if abs ( ( segment . start - local_segment . start ) . total_seconds ( ) ) > EPSILON :
continue
found = local_segment
break
if found is not None :
self . logger . debug ( f ' Skipping { path } as within { EPSILON } s of identical segment { found . path } ' )
continue
# start segment as soon as a pool slot opens up, then track it in workers
workers . append ( pool . spawn (
get_remote_segment ,
self . base_dir , self . node , channel , quality , hour , missing_segment , self . logger
) )
small_difference_segments . labels ( self . node , channel , quality , hour ) . set ( small_differences )
# verify that all the workers succeeded. if any failed, raise the exception from
# one of them arbitrarily.
for worker in workers :