@ -32,16 +32,16 @@ TIMEOUT = 5 #default timeout in seconds for remote requests or exceptions
MAX_BACKOFF = 4 #number of times to back off
def list_local_segments ( base_dir , stream, variant , hour ) :
def list_local_segments ( base_dir , channel, quality , hour ) :
""" List segments in a given hour directory.
For a given base_dir / stream/ variant / hour directory return a list of
For a given base_dir / channel/ quality / hour directory return a list of
non - hidden files . If the directory path is not found , return an empty list .
Based on based on restreamer . list_segments . We could just call
restreamer . list_segments but this avoids HTTP / JSON overheads . """
path = os . path . join ( base_dir , stream, variant , hour )
path = os . path . join ( base_dir , channel, quality , hour )
try :
return [ name for name in os . listdir ( path ) if not name . startswith ( ' . ' ) ]
@ -51,31 +51,31 @@ def list_local_segments(base_dir, stream, variant, hour):
return [ ]
def list_remote_hours ( node , stream, variant , timeout = TIMEOUT ) :
def list_remote_hours ( node , channel, quality , timeout = TIMEOUT ) :
""" Wrapper around a call to restreamer.list_hours. """
uri = ' {} /files/ {} / {} ' . format ( node , stream, variant )
uri = ' {} /files/ {} / {} ' . format ( node , channel, quality )
logging . debug ( ' Getting list of hours from {} ' . format ( uri ) )
resp = requests . get ( uri , timeout = timeout )
return common . encode_strings ( resp . json ( ) )
def list_remote_segments ( node , stream, variant , hour , timeout = TIMEOUT ) :
def list_remote_segments ( node , channel, quality , hour , timeout = TIMEOUT ) :
""" Wrapper around a call to restreamer.list_segments. """
uri = ' {} /files/ {} / {} / {} ' . format ( node , stream, variant , hour )
uri = ' {} /files/ {} / {} / {} ' . format ( node , channel, quality , hour )
logging . debug ( ' Getting list of segments from {} ' . format ( uri ) )
resp = requests . get ( uri , timeout = timeout )
return common . encode_strings ( resp . json ( ) )
def get_remote_segment ( base_dir , node , stream, variant , hour , missing_segment ,
def get_remote_segment ( base_dir , node , channel, quality , hour , missing_segment ,
timeout = TIMEOUT ) :
""" Get a segment from a node.
Fetches stream/ variant / hour / missing_segment from node and puts it in
base_dir / stream/ variant / hour / missing_segment . If the segment already exists
Fetches channel/ quality / hour / missing_segment from node and puts it in
base_dir / channel/ quality / hour / missing_segment . If the segment already exists
locally , this does not attempt to fetch it . """
path = os . path . join ( base_dir , stream, variant , hour , missing_segment )
path = os . path . join ( base_dir , channel, quality , hour , missing_segment )
# check to see if file was created since we listed the local segments to
# avoid unnecessarily copying
if os . path . exists ( path ) :
@ -90,8 +90,8 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
try :
logging . debug ( ' Fetching segment {} from {} ' . format ( path , node ) )
uri = ' {} /segments/ {} / {} / {} / {} ' . format ( node , stream, variant , hour , missing_segment )
resp = requests . get ( uri , stream = True , timeout = timeout )
uri = ' {} /segments/ {} / {} / {} / {} ' . format ( node , channel, quality , hour , missing_segment )
resp = requests . get ( uri , channel = True , timeout = timeout )
resp . raise_for_status ( )
@ -106,13 +106,13 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
raise
logging . debug ( ' Saving completed segment {} as {} ' . format ( temp_path , path ) )
common . rename ( temp_path , path )
segments_backfilled . labels ( remote = node , stream= stream , variant = variant , hour = hour ) . inc ( )
segments_backfilled . labels ( remote = node , channel= channel , quality = quality , hour = hour ) . inc ( )
def list_hours ( node , stream, variant s, start = None ) :
def list_hours ( node , channel, qualitie s, start = None ) :
""" Return a list of all available hours from a node.
List all hours available from node / stream for each variant in variant s
List all hours available from node / channel for each quality in qualitie s
ordered from newest to oldest .
Keyword arguments :
@ -120,7 +120,7 @@ def list_hours(node, stream, variants, start=None):
return hours more recent than that number of hours ago . If None ( default ) ,
all hours are returned . """
hour_lists = [ list_remote_hours ( node , stream, variant ) for variant in variant s]
hour_lists = [ list_remote_hours ( node , channel, quality ) for quality in qualitie s]
hours = list ( set ( ) . union ( * hour_lists ) )
hours . sort ( reverse = True ) #latest hour first
@ -142,15 +142,15 @@ class BackfillerManager(object):
NODE_INTERVAL = 300 #seconds between updating list of nodes
def __init__ ( self , base_dir , stream, variant s, static_nodes = [ ] , start = None ,
run_once= False , node_file = None , node_database = None , localhost = None ,
download_concurrency= 5 , recent_cutoff = 120 ) :
def __init__ ( self , base_dir , channel, qualitie s, static_nodes = [ ] ,
start= None , run_once= False , node_file = None , node_database = None ,
localhost= None , download_concurrency= 5 , recent_cutoff = 120 ) :
""" Constructor for BackfillerManager.
Creates a manager for a given stream with specified variants . If
static_nodes is None , manager """
Creates a manager for a given channel with specified qualities . """
self . base_dir = base_dir
self . stream = stream
self . variants = variant s
self . channel = channel
self . qualities = qualitie s
self . static_nodes = static_nodes
self . start = start
self . run_once = run_once
@ -162,7 +162,7 @@ class BackfillerManager(object):
self . download_concurrency = download_concurrency
self . recent_cutoff = recent_cutoff
self . stopping = gevent . event . Event ( )
self . logger = logging . getLogger ( " BackfillerManager( {} ) " . format ( stream ) )
self . logger = logging . getLogger ( " BackfillerManager( {} ) " . format ( channel ) )
self . workers = { } # {node url: worker}
def stop ( self ) :
@ -276,8 +276,8 @@ class BackfillerManager(object):
class BackfillerWorker ( object ) :
""" Backfills segments from a node.
Backfills every WAIT_INTERVAL all segments from node / stream to
base_dir / stream for all variant s. If run_once , only backfill once .
Backfills every WAIT_INTERVAL all segments from node / channel to
base_dir / channel for all qualitie s. If run_once , only backfill once .
recent_cutoff - - Skip backfilling segments younger than this number of
seconds to prioritise letting the downloader grab these segments .
@ -291,8 +291,8 @@ class BackfillerWorker(object):
self . base_dir = manager . base_dir
self . node = node
self . download_concurrency = manager . download_concurrency
self . stream = manager . stream
self . variants = manager . variant s
self . channel = manager . channel
self . qualities = manager . qualitie s
self . start = manager . start
self . run_once = manager . run_once
self . recent_cutoff = manager . recent_cutoff
@ -300,7 +300,7 @@ class BackfillerWorker(object):
self . done = gevent . event . Event ( )
def __repr__ ( self ) :
return ' < {} at 0x {:x} for {!r} / {!r} > ' . format ( type ( self ) . __name__ , id ( self ) , self . node , self . stream )
return ' < {} at 0x {:x} for {!r} / {!r} > ' . format ( type ( self ) . __name__ , id ( self ) , self . node , self . channel )
__str__ = __repr__
def stop ( self ) :
@ -311,17 +311,17 @@ class BackfillerWorker(object):
def backfill ( self , hours ) :
""" Backfill from remote node.
Backfill from node / stream/ variants to base_dir / stream / variants for each
hour in hours .
Backfill from node / channel/ qualities to base_dir / channel / qualities for
each hour in hours .
"""
for variant in self . variant s:
for quality in self . qualitie s:
for hour in hours :
self . logger . debug ( ' Backfilling {} / {} ' . format ( variant , hour ) )
self . logger . debug ( ' Backfilling {} / {} ' . format ( quality , hour ) )
local_segments = set ( list_local_segments ( self . base_dir , self . stream, variant , hour ) )
remote_segments = set ( list_remote_segments ( self . node , self . stream, variant , hour ) )
local_segments = set ( list_local_segments ( self . base_dir , self . channel, quality , hour ) )
remote_segments = set ( list_remote_segments ( self . node , self . channel, quality , hour ) )
missing_segments = list ( remote_segments - local_segments )
# randomise the order of the segments to reduce the chance that
@ -336,7 +336,7 @@ class BackfillerWorker(object):
if self . stopping . is_set ( ) :
return
path = os . path . join ( self . stream, variant , hour , missing_segment )
path = os . path . join ( self . channel, quality , hour , missing_segment )
# test to see if file is a segment and get the segments start time
try :
@ -359,7 +359,7 @@ class BackfillerWorker(object):
# start segment as soon as a pool slot opens up, then track it in workers
workers . append ( pool . spawn (
get_remote_segment ,
self . base_dir , self . node , self . stream, variant , hour , missing_segment
self . base_dir , self . node , self . channel, quality , hour , missing_segment
) )
# verify that all the workers succeeded. if any failed, raise the exception from
@ -367,7 +367,7 @@ class BackfillerWorker(object):
for worker in workers :
worker . get ( ) # re-raise error, if any
self . logger . info ( ' {} segments in {} / {} backfilled ' . format ( len ( workers ) , variant , hour ) )
self . logger . info ( ' {} segments in {} / {} backfilled ' . format ( len ( workers ) , quality , hour ) )
def run ( self ) :
self . logger . info ( ' Starting ' )
@ -376,7 +376,7 @@ class BackfillerWorker(object):
while not self . stopping . is_set ( ) :
try :
self . backfill ( list_hours ( self . node , self . stream, self . variant s, self . start ) )
self . backfill ( list_hours ( self . node , self . channel, self . qualitie s, self . start ) )
failures = 0 #reset failure count on a successful backfill
if not self . run_once :
self . stopping . wait ( common . jitter ( self . WAIT_INTERVAL ) )
@ -396,9 +396,9 @@ class BackfillerWorker(object):
if self . node in self . manager . workers :
del self . manager . workers [ self . node ]
@argh.arg ( " streams " , nargs = " * " )
@argh.arg ( ' channels ' , nargs = ' * ' , help = ' Channels to backfill from ' )
@argh.arg ( ' --base-dir ' , help = ' Directory to which segments will be backfilled. Default is current working directory. ' )
@argh.arg ( ' -- variants' , help = " Variants of each stream to backfill. Comma seperated if multiple. Default is ' source ' . " )
@argh.arg ( ' -- qualities' , help = " Qualities of each channel to backfill. Comma seperated if multiple. Default is ' source ' . " )
@argh.arg ( ' --metrics-port ' , help = ' Port for Prometheus stats. Default is 8002. ' )
@argh.arg ( ' --static-nodes ' , help = ' Nodes to always backfill from. Comma seperated if multiple. By default empty. ' )
@argh.arg ( ' --backdoor-port ' , help = ' Port for gevent.backdoor access. By default disabled. ' )
@ -409,14 +409,14 @@ class BackfillerWorker(object):
@argh.arg ( ' --localhost ' , help = ' Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname() ' )
@argh.arg ( ' --download-concurrency ' , help = ' Max number of concurrent segment downloads from a single node. Increasing this number may increase throughput but too high a value can overload the server or cause timeouts. ' )
@argh.arg ( ' --recent-cutoff ' , help = ' Minimum age for a segment before we will backfill it, to prevent us backfilling segments we could have just downloaded ourselves instead. Expressed as number of seconds. ' )
def main ( streams, base_dir = ' . ' , variant s= ' source ' , metrics_port = 8002 ,
def main ( channels, base_dir = ' . ' , qualitie s= ' source ' , metrics_port = 8002 ,
static_nodes = ' ' , backdoor_port = 0 , start = None , run_once = False ,
node_file = None , node_database = None , localhost = socket . gethostname ( ) ,
download_concurrency = 5 , recent_cutoff = 120 ) :
""" Backfiller service. """
variants = variant s. split ( ' , ' ) if variant s else [ ]
variants = [ variant . strip ( ) for variant in variant s]
qualities = qualitie s. split ( ' , ' ) if qualitie s else [ ]
qualities = [ quality . strip ( ) for quality in qualitie s]
static_nodes = static_nodes . split ( ' , ' ) if static_nodes else [ ]
static_nodes = [ static_node . strip ( ) for static_node in static_nodes ]
@ -434,9 +434,9 @@ def main(streams, base_dir='.', variants='source', metrics_port=8002,
managers = [ ]
workers = [ ]
for stream in stream s:
logging . info ( ' Starting backfilling {} with {} as variants to {} ' . format ( stream , ' , ' . join ( variant s) , base_dir ) )
manager = BackfillerManager ( base_dir , stream, variant s, static_nodes , start , run_once , node_file , node_database , localhost , download_concurrency , recent_cutoff )
for channel in channel s:
logging . info ( ' Starting backfilling {} with {} as qualities to {} ' . format ( channel , ' , ' . join ( qualitie s) , base_dir ) )
manager = BackfillerManager ( base_dir , channel, qualitie s, static_nodes , start , run_once , node_file , node_database , localhost , download_concurrency , recent_cutoff )
managers . append ( manager )
workers . append ( gevent . spawn ( manager . run ) )