@ -27,14 +27,14 @@ import common.requests
segments_downloaded = prom . Counter (
" segments_downloaded " ,
" Number of segments either partially or fully downloaded " ,
[ " partial " , " channel " , " quality " ] ,
[ " type " , " channel " , " quality " ] ,
)
segment_duration_downloaded = prom . Counter (
" segment_duration_downloaded " ,
" Total duration of all segments partially or fully downloaded. "
" Note partial segments still count the full duration. " ,
[ " partial " , " channel " , " quality " ] ,
[ " type " , " channel " , " quality " ] ,
)
latest_segment = prom . Gauge (
@ -420,6 +420,11 @@ class SegmentGetter(object):
# full timeout is for the entire download and stream to disk.
FETCH_HEADERS_TIMEOUTS = 5 , 60
FETCH_FULL_TIMEOUTS = 15 , 240
# Experimentally, we've observed that after 60s, stuck requests will be terminated
# by twitch with incomplete but valid data, without anything indicating an error.
# We assume anything longer than 60s is "suspect", not to be used if we have a
# version that was fetched in a more timely manner.
FETCH_SUSPECT_TIME = 59
# Overall timeout on the Getter before giving up, to prevent always-failing Getters
# from growing without bound and causing resource exhaustion issues.
# The longest we've observed in the wild before a segment goes un-fetchable is 7min
@ -493,6 +498,7 @@ class SegmentGetter(object):
""" Generate filepath for the segment.
Type may be :
full : Segment is complete . Hash is included .
suspect : Segment appears to be complete , but we suspect it is not . Hash is included .
partial : Segment is incomplete . Hash is included .
temp : Segment has not been downloaded yet . A random uuid is added .
"""
@ -500,7 +506,7 @@ class SegmentGetter(object):
return " {} - {} - {} .ts " . format ( self . prefix , type , arg )
def exists ( self ) :
""" Look for an existing, full (non-partial ) copy of this segment. Return bool."""
""" Look for an existing, full (non-partial , non-suspect ) copy of this segment. Return bool."""
dirname = os . path . dirname ( self . prefix )
try :
candidates = os . listdir ( dirname )
@ -530,6 +536,7 @@ class SegmentGetter(object):
file_created = False
try :
self . logger . debug ( " Downloading segment {} to {} " . format ( self . segment , temp_path ) )
start_time = monotonic ( )
with soft_hard_timeout ( self . logger , " getting and writing segment " , self . FETCH_FULL_TIMEOUTS , retry . set ) :
with soft_hard_timeout ( self . logger , " getting segment headers " , self . FETCH_HEADERS_TIMEOUTS , retry . set ) :
resp = self . session . get ( self . segment . uri , stream = True , metric_name = ' get_segment ' )
@ -556,15 +563,17 @@ class SegmentGetter(object):
partial_path = self . make_path ( " partial " , hash )
self . logger . warning ( " Saving partial segment {} as {} " . format ( temp_path , partial_path ) )
common . rename ( temp_path , partial_path )
segments_downloaded . labels ( partial = " True " , channel = self . channel , quality = self . quality ) . inc ( )
segment_duration_downloaded . labels ( partial = " True " , channel = self . channel , quality = self . quality ) . inc ( self . segment . duration )
segments_downloaded . labels ( type = " partial " , channel = self . channel , quality = self . quality ) . inc ( )
segment_duration_downloaded . labels ( type = " partial " , channel = self . channel , quality = self . quality ) . inc ( self . segment . duration )
raise ex_type , ex , tb
else :
full_path = self . make_path ( " full " , hash )
request_duration = monotonic ( ) - start_time
segment_type = " full " if request_duration < self . FETCH_SUSPECT_TIME else " suspect "
full_path = self . make_path ( segment_type , hash )
self . logger . debug ( " Saving completed segment {} as {} " . format ( temp_path , full_path ) )
common . rename ( temp_path , full_path )
segments_downloaded . labels ( partial = " False " , channel = self . channel , quality = self . quality ) . inc ( )
segment_duration_downloaded . labels ( partial = " False " , channel = self . channel , quality = self . quality ) . inc ( self . segment . duration )
segments_downloaded . labels ( type = segment_type , channel = self . channel , quality = self . quality ) . inc ( )
segment_duration_downloaded . labels ( type = segment_type , channel = self . channel , quality = self . quality ) . inc ( self . segment . duration )
# Prom doesn't provide a way to compare value to gauge's existing value,
# we need to reach into internals
stat = latest_segment . labels ( channel = self . channel , quality = self . quality )