@ -52,19 +52,26 @@ ad_segments_ignored = prom.Counter(
suspicious_skew_count = prom . Counter (
" suspicious_skew_count " ,
" Number of times we' ve restarted a worker due to suspicious skew ",
" ",
[ " channel " , " quality " ] ,
)
segment_time_skew = prom . Histogram (
" segment_time_skew " ,
" " ,
[ " channel " , " quality " , " worker " ] ,
buckets = [ - 10 , - 1 , - 0.5 , - 0.1 , - 0.01 , - 0.001 , 0 , 0.001 , 0.01 , 0.1 , 0.5 , 1 , 10 ] ,
)
segment_time_skew_non_zero_sum = prom . Gauge (
" segment_time_skew_non_zero_sum " ,
" Sum of all observed segment skew amounts for worker " ,
" ",
[ " channel " , " quality " , " worker " ] ,
)
segment_time_skew_non_zero_count = prom . Counter (
" segment_time_skew_non_zero_count " ,
" Count of segments with non-zero skew for worker ",
" ",
[ " channel " , " quality " , " worker " ] ,
)
@ -141,7 +148,7 @@ class StreamsManager(object):
FETCH_TIMEOUTS = 5 , 30
def __init__ ( self , provider , channel , base_dir , qualities , important = False , history_size = 0 ):
def __init__ ( self , provider , channel , base_dir , qualities , important = False ):
self . provider = provider
self . channel = channel
self . logger = logging . getLogger ( " StreamsManager( {} ) " . format ( channel ) )
@ -222,7 +229,7 @@ class StreamsManager(object):
self . logger . info ( " Ignoring worker start as we are stopping " )
return
url_time , url = self . latest_urls [ quality ]
worker = StreamWorker ( self , quality , url , url_time , self . history_size )
worker = StreamWorker ( self , quality , url , url_time )
self . stream_workers [ quality ] . append ( worker )
gevent . spawn ( worker . run )
@ -314,7 +321,7 @@ class StreamWorker(object):
# See https://github.com/dbvideostriketeam/wubloader/issues/539
MAX_SEGMENT_TIME_SKEW = 0.01
def __init__ ( self , manager , quality , url , url_time , history_size ):
def __init__ ( self , manager , quality , url , url_time ):
self . manager = manager
self . logger = manager . logger . getChild ( " StreamWorker( {} )@ {:x} " . format ( quality , id ( self ) ) )
self . quality = quality
@ -336,9 +343,6 @@ class StreamWorker(object):
# We expect the map data to be very small so there is no eviction here.
# {uri: data}
self . map_cache = { }
# If enabled, playlist history is saved after each segment fetch,
# showing the last N playlist fetches up until the one that resulted in that fetch.
self . history_size = history_size
def __repr__ ( self ) :
return " < {} at 0x {:x} for stream {!r} > " . format ( type ( self ) . __name__ , id ( self ) , self . quality )
@ -378,6 +382,7 @@ class StreamWorker(object):
first = True
suspicious_skew = False
history = [ ]
HISTORY_SIZE = 10
while not self . stopping . is_set ( ) :
self . logger . debug ( " Getting media playlist {} " . format ( self . url ) )
@ -400,8 +405,7 @@ class StreamWorker(object):
# We successfully got the playlist at least once
first = False
if self . history_size > 0 :
history = [ ( playlist_time , raw_playlist ) ] + history [ : self . history_size ]
history = [ ( playlist_time , raw_playlist ) ] + history [ : HISTORY_SIZE ]
# Start any new segment getters
date = None # tracks date in case some segment doesn't include it
@ -427,6 +431,7 @@ class StreamWorker(object):
# 2. treat all subsequent segments as "suspect" so they are still saved
# but only used if no other source is available.
skew = ( date - new_date ) . total_seconds ( )
segment_time_skew . labels ( self . manager . channel , self . quality , f " { id ( self ) : x } " ) . observe ( skew )
if skew != 0 :
segment_time_skew_non_zero_sum . labels ( self . manager . channel , self . quality , f " { id ( self ) : x } " ) . inc ( skew )
segment_time_skew_non_zero_count . labels ( self . manager . channel , self . quality , f " { id ( self ) : x } " ) . inc ( )
@ -680,8 +685,7 @@ class SegmentGetter(object):
stat = latest_segment . labels ( channel = self . channel , quality = self . quality )
timestamp = ( self . date - datetime . datetime ( 1970 , 1 , 1 ) ) . total_seconds ( )
stat . set ( max ( stat . _value . get ( ) , timestamp ) ) # NOTE: not thread-safe but is gevent-safe
if self . history :
self . write_history ( full_path )
self . write_history ( full_path )
def write_history ( self , segment_path ) :
segment_path = os . path . relpath ( segment_path , self . base_dir )
@ -712,7 +716,7 @@ def parse_channel(channel):
" This affects retry interval, error reporting and monitoring. "
" Non-twitch URLs can also be given with the form CHANNEL[!]:TYPE:URL "
)
def main ( channels , base_dir = " . " , qualities = " source " , metrics_port = 8001 , backdoor_port = 0 , twitch_auth_file = None , playlist_debug = 0 ):
def main ( channels , base_dir = " . " , qualities = " source " , metrics_port = 8001 , backdoor_port = 0 , twitch_auth_file = None ):
qualities = qualities . split ( " , " ) if qualities else [ ]
twitch_auth_token = None
@ -733,7 +737,7 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
channel_qualities = [ " source " ]
else :
raise ValueError ( f " Unknown type { type !r} " )
manager = StreamsManager ( provider , channel , base_dir , channel_qualities , important = important , history_size = playlist_debug )
manager = StreamsManager ( provider , channel , base_dir , channel_qualities , important = important )
managers . append ( manager )
def stop ( ) :