@ -15,6 +15,7 @@ import gevent.backdoor
import gevent . event
import prometheus_client as prom
import requests
import requests . adapters
from monotonic import monotonic
import common
@ -49,6 +50,24 @@ ad_segments_ignored = prom.Counter(
[ " channel " , " quality " ] ,
)
suspicious_skew_count = prom . Counter (
" suspicious_skew_count " ,
" Number of times we ' ve restarted a worker due to suspicious skew " ,
[ " channel " , " quality " ] ,
)
segment_time_skew_non_zero_sum = prom . Gauge (
" segment_time_skew_non_zero_sum " ,
" Sum of all observed segment skew amounts for worker " ,
[ " channel " , " quality " , " worker " ] ,
)
segment_time_skew_non_zero_count = prom . Counter (
" segment_time_skew_non_zero_count " ,
" Count of segments with non-zero skew for worker " ,
[ " channel " , " quality " , " worker " ] ,
)
class TimedOutError ( Exception ) :
pass
@ -122,7 +141,7 @@ class StreamsManager(object):
FETCH_TIMEOUTS = 5 , 30
def __init__ ( self , provider , channel , base_dir , qualities , important = False ):
def __init__ ( self , provider , channel , base_dir , qualities , important = False , history_size = 0 ):
self . provider = provider
self . channel = channel
self . logger = logging . getLogger ( " StreamsManager( {} ) " . format ( channel ) )
@ -203,7 +222,7 @@ class StreamsManager(object):
self . logger . info ( " Ignoring worker start as we are stopping " )
return
url_time , url = self . latest_urls [ quality ]
worker = StreamWorker ( self , quality , url , url_time )
worker = StreamWorker ( self , quality , url , url_time , self . history_size )
self . stream_workers [ quality ] . append ( worker )
gevent . spawn ( worker . run )
@ -290,7 +309,12 @@ class StreamWorker(object):
FETCH_RETRY_INTERVAL = 1
FETCH_POLL_INTERVAL = 2
def __init__ ( self , manager , quality , url , url_time ) :
# Max difference between a segment's time + duration and the next segment's time (in seconds)
# before we consider this "suspicious" and trigger a refresh.
# See https://github.com/dbvideostriketeam/wubloader/issues/539
MAX_SEGMENT_TIME_SKEW = 0.01
def __init__ ( self , manager , quality , url , url_time , history_size ) :
self . manager = manager
self . logger = manager . logger . getChild ( " StreamWorker( {} )@ {:x} " . format ( quality , id ( self ) ) )
self . quality = quality
@ -305,11 +329,16 @@ class StreamWorker(object):
# This worker's SegmentGetters will use its session by default for performance,
# but will fall back to a new one if something goes wrong.
self . session = common . requests . InstrumentedSession ( )
adapter = requests . adapters . HTTPAdapter ( pool_maxsize = 100 )
self . session . mount ( ' https:// ' , adapter )
# Map cache is a simple cache to avoid re-downloading the same map URI for every segment,
# since it's generally the same but may occasionally change.
# We expect the map data to be very small so there is no eviction here.
# {uri: data}
self . map_cache = { }
# If enabled, playlist history is saved after each segment fetch,
# showing the last N playlist fetches up until the one that resulted in that fetch.
self . history_size = history_size
def __repr__ ( self ) :
return " < {} at 0x {:x} for stream {!r} > " . format ( type ( self ) . __name__ , id ( self ) , self . quality )
@ -347,12 +376,15 @@ class StreamWorker(object):
def _run ( self ) :
first = True
suspicious_skew = False
history = [ ]
while not self . stopping . is_set ( ) :
self . logger . debug ( " Getting media playlist {} " . format ( self . url ) )
try :
with soft_hard_timeout ( self . logger , " getting media playlist " , self . FETCH_TIMEOUTS , self . trigger_new_worker ) :
playlist = self . manager . provider . get_media_playlist ( self . url , session = self . session )
playlist_time = datetime . datetime . utcnow ( )
raw_playlist , playlist = self . manager . provider . get_media_playlist ( self . url , session = self . session )
except Exception as e :
self . logger . warning ( " Failed to fetch media playlist {} " . format ( self . url ) , exc_info = True )
self . trigger_new_worker ( )
@ -368,8 +400,12 @@ class StreamWorker(object):
# We successfully got the playlist at least once
first = False
if self . history_size > 0 :
history = [ ( playlist_time , raw_playlist ) ] + history [ : self . history_size ]
# Start any new segment getters
date = None # tracks date in case some segment doesn't include it
prev_segment = None
for segment in playlist . segments :
if segment . ad_reason :
self . logger . info ( " Ignoring ad segment: {} " . format ( segment . ad_reason ) )
@ -380,7 +416,26 @@ class StreamWorker(object):
self . manager . mark_working ( self )
if segment . date :
date = common . dateutil . parse ( segment . date )
new_date = common . dateutil . parse ( segment . date )
if date is not None :
# We have observed an issue on twitch where there will be a small time jump
# (eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7)
# and all subsequent segment timestamps will be out by this amount compared
# to what other downloader instances see. Our workaround for this issue is to
# watch for such gaps and:
# 1. trigger a worker refresh (which seems to fix the issue)
# 2. treat all subsequent segments as "suspect" so they are still saved
# but only used if no other source is available.
skew = ( date - new_date ) . total_seconds ( )
if skew != 0 :
segment_time_skew_non_zero_sum . labels ( self . manager . channel , self . quality , f " { id ( self ) : x } " ) . inc ( skew )
segment_time_skew_non_zero_count . labels ( self . manager . channel , self . quality , f " { id ( self ) : x } " ) . inc ( )
if abs ( skew ) > self . MAX_SEGMENT_TIME_SKEW and not suspicious_skew :
self . logger . warning ( f " Suspicious skew of { skew } , triggering new worker: Expected { date } after { prev_segment } , got { new_date } for { segment } " )
self . trigger_new_worker ( )
suspicious_skew = True
suspicious_skew_count . labels ( self . manager . channel , self . quality ) . inc ( )
date = new_date
if segment . uri not in self . getters :
if date is None :
raise ValueError ( " Cannot determine date of segment " )
@ -392,11 +447,14 @@ class StreamWorker(object):
self . quality ,
segment ,
date ,
suspicious_skew ,
self . map_cache ,
history ,
)
gevent . spawn ( self . getters [ segment . uri ] . run )
if date is not None :
date + = datetime . timedelta ( seconds = segment . duration )
prev_segment = segment
# Clean up any old segment getters.
# Note use of list() to make a copy to avoid modification-during-iteration
@ -448,19 +506,21 @@ class SegmentGetter(object):
# or so, to be paranoid we set it to considerably longer than that.
GIVE_UP_TIMEOUT = 20 * 60
def __init__ ( self , parent_logger , session , base_dir , channel , quality , segment , date , map_cache) :
def __init__ ( self , parent_logger , session , base_dir , channel , quality , segment , date , suspect, map_cache, history ) :
self . logger = parent_logger . getChild ( " SegmentGetter@ {:x} " . format ( id ( self ) ) )
self . base_dir = base_dir
self . channel = channel
self . quality = quality
self . segment = segment
self . date = date
self . suspect = suspect
self . map_cache = map_cache
self . prefix = self . make_path_prefix ( )
self . retry = None # Event, set to begin retrying
self . done = gevent . event . Event ( ) # set when file exists or we give up
# Our parent's connection pool, but we'll replace it if there's any issues
self . session = session
self . history = history
def run ( self ) :
try :
@ -517,6 +577,12 @@ class SegmentGetter(object):
Type may be :
full : Segment is complete . Hash is included .
suspect : Segment appears to be complete , but we suspect it is not . Hash is included .
This currently triggers on two conditions :
- If a segment takes a very long time to download , which we ' ve observed to result in
partial files even though they appeared to end correctly .
- If the StreamWorker has encountered a time gap , then we suspect this segment to be
mis - timed . We have observed this where there is a small ( ~ 0.5 s ) time jump , then
all segments are consistently off by that amount compared to other nodes until refresh .
partial : Segment is incomplete . Hash is included .
temp : Segment has not been downloaded yet . A random uuid is added .
"""
@ -601,7 +667,9 @@ class SegmentGetter(object):
raise e
else :
request_duration = monotonic ( ) - start_time
segment_type = " full " if request_duration < self . FETCH_SUSPECT_TIME else " suspect "
segment_type = " full "
if self . suspect or request_duration > = self . FETCH_SUSPECT_TIME :
segment_type = " suspect "
full_path = self . make_path ( segment_type , hash )
self . logger . debug ( " Saving completed segment {} as {} " . format ( temp_path , full_path ) )
common . rename ( temp_path , full_path )
@ -612,6 +680,20 @@ class SegmentGetter(object):
stat = latest_segment . labels ( channel = self . channel , quality = self . quality )
timestamp = ( self . date - datetime . datetime ( 1970 , 1 , 1 ) ) . total_seconds ( )
stat . set ( max ( stat . _value . get ( ) , timestamp ) ) # NOTE: not thread-safe but is gevent-safe
if self . history :
self . write_history ( full_path )
def write_history ( self , segment_path ) :
segment_path = os . path . relpath ( segment_path , self . base_dir )
history_path = os . path . join ( self . base_dir , " playlist-debug " , segment_path )
try :
os . makedirs ( history_path )
except FileExistsError :
pass
for n , ( timestamp , playlist ) in enumerate ( self . history ) :
filename = " {} _ {} " . format ( n , timestamp . strftime ( " % Y- % m- %d T % H: % M: % S. %f " ) )
path = os . path . join ( history_path , filename )
common . atomic_write ( path , playlist )
def parse_channel ( channel ) :
@ -630,7 +712,7 @@ def parse_channel(channel):
" This affects retry interval, error reporting and monitoring. "
" Non-twitch URLs can also be given with the form CHANNEL[!]:TYPE:URL "
)
def main ( channels , base_dir = " . " , qualities = " source " , metrics_port = 8001 , backdoor_port = 0 , twitch_auth_file = None ):
def main ( channels , base_dir = " . " , qualities = " source " , metrics_port = 8001 , backdoor_port = 0 , twitch_auth_file = None , playlist_debug = 0 ):
qualities = qualities . split ( " , " ) if qualities else [ ]
twitch_auth_token = None
@ -651,7 +733,7 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
channel_qualities = [ " source " ]
else :
raise ValueError ( f " Unknown type { type !r} " )
manager = StreamsManager ( provider , channel , base_dir , channel_qualities , important = important )
manager = StreamsManager ( provider , channel , base_dir , channel_qualities , important = important , history_size = playlist_debug )
managers . append ( manager )
def stop ( ) :