@ -109,11 +109,14 @@ class StreamsManager(object):
* A worker is older than MAX_WORKER_AGE
* A worker is older than MAX_WORKER_AGE
"""
"""
FETCH_MIN_INTERVAL = 5
# Important streams are retried more aggressively when down
IMPORTANT_FETCH_MIN_INTERVAL = 5
FETCH_MIN_INTERVAL = 20
FETCH_TIMEOUTS = 5 , 30
FETCH_TIMEOUTS = 5 , 30
MAX_WORKER_AGE = 20 * 60 * 60 # 20 hours, twitch's media playlist links expire after 24 hours
MAX_WORKER_AGE = 20 * 60 * 60 # 20 hours, twitch's media playlist links expire after 24 hours
def __init__ ( self , channel , base_dir , qualities ):
def __init__ ( self , channel , base_dir , qualities , important = False ):
self . channel = channel
self . channel = channel
self . logger = logging . getLogger ( " StreamsManager( {} ) " . format ( channel ) )
self . logger = logging . getLogger ( " StreamsManager( {} ) " . format ( channel ) )
self . base_dir = base_dir
self . base_dir = base_dir
@ -122,6 +125,9 @@ class StreamsManager(object):
self . latest_urls_changed = gevent . event . Event ( ) # set when latest_urls changes
self . latest_urls_changed = gevent . event . Event ( ) # set when latest_urls changes
self . refresh_needed = gevent . event . Event ( ) # set to tell main loop to refresh now
self . refresh_needed = gevent . event . Event ( ) # set to tell main loop to refresh now
self . stopping = gevent . event . Event ( ) # set to tell main loop to stop
self . stopping = gevent . event . Event ( ) # set to tell main loop to stop
self . important = important
if self . important :
self . FETCH_MIN_INTERVAL = self . IMPORTANT_FETCH_MIN_INTERVAL
def mark_working ( self , worker ) :
def mark_working ( self , worker ) :
""" Notify the manager that the given worker is up and running,
""" Notify the manager that the given worker is up and running,
@ -219,7 +225,9 @@ class StreamsManager(object):
self . start_worker ( quality )
self . start_worker ( quality )
except Exception as e :
except Exception as e :
if isinstance ( e , requests . HTTPError ) and e . response is not None and e . response . status_code == 404 :
if isinstance ( e , requests . HTTPError ) and e . response is not None and e . response . status_code == 404 :
self . logger . info ( " Stream is not up. Retrying. " )
# Log about important streams being down at info, but others at debug.
level = logging . INFO if self . important else logging . DEBUG
self . logger . log ( level , " Stream is not up. Retrying. " )
self . trigger_refresh ( )
self . trigger_refresh ( )
else :
else :
self . logger . exception ( " Failed to fetch master playlist " )
self . logger . exception ( " Failed to fetch master playlist " )
@ -563,12 +571,15 @@ class SegmentGetter(object):
stat . set ( max ( stat . _value . get ( ) , timestamp ) ) # NOTE: not thread-safe but is gevent-safe
stat . set ( max ( stat . _value . get ( ) , timestamp ) ) # NOTE: not thread-safe but is gevent-safe
@argh.arg ( ' channels ' , nargs = " + " , help = " Twitch channels to watch " )
@argh.arg ( ' channels ' , nargs = " + " , help =
" Twitch channels to watch. Add a ' ! ' suffix to indicate they ' re expected to be always up. "
" This affects retry interval, error reporting and monitoring. "
)
def main ( channels , base_dir = " . " , qualities = " source " , metrics_port = 8001 , backdoor_port = 0 ) :
def main ( channels , base_dir = " . " , qualities = " source " , metrics_port = 8001 , backdoor_port = 0 ) :
qualities = qualities . split ( " , " ) if qualities else [ ]
qualities = qualities . split ( " , " ) if qualities else [ ]
managers = [
managers = [
StreamsManager ( channel , base_dir , qualities )
StreamsManager ( channel . rstrip ( ' ! ' ) , base_dir , qualities , important = channel . endswith ( ' ! ' ) )
for channel in channels
for channel in channels
]
]