@ -21,7 +21,7 @@ import common
import common . dateutil
import common . requests
from . providers import URLProvider , TwitchProvider , YoutubeProvider
from . providers import URLProvider , TwitchProvider , YoutubeProvider , LocalProvider
segments_downloaded = prom . Counter (
@ -384,16 +384,25 @@ class StreamWorker(object):
if segment . uri not in self . getters :
if date is None :
raise ValueError ( " Cannot determine date of segment " )
self . getters [ segment . uri ] = SegmentGetter (
self . logger ,
self . session ,
self . manager . base_dir ,
self . manager . channel ,
self . quality ,
segment ,
date ,
self . map_cache ,
)
if isinstance ( self . manager . provider , LocalProvider ) :
self . getters [ segment . uri ] = SegmentLinker (
self . logger ,
self . manager . base_dir ,
self . manager . channel ,
segment ,
date ,
)
else :
self . getters [ segment . uri ] = SegmentGetter (
self . logger ,
self . session ,
self . manager . base_dir ,
self . manager . channel ,
self . quality ,
segment ,
date ,
self . map_cache ,
)
gevent . spawn ( self . getters [ segment . uri ] . run )
if date is not None :
date + = datetime . timedelta ( seconds = segment . duration )
@ -649,6 +658,9 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
elif type == " youtube " :
provider = YoutubeProvider ( url )
channel_qualities = [ " source " ]
elif type == " local " :
provider = LocalProvider ( url )
channel_qualities = [ " source " ]
else :
raise ValueError ( f " Unknown type { type !r} " )
manager = StreamsManager ( provider , channel , base_dir , channel_qualities , important = important )
@ -687,3 +699,50 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
worker . get ( ) # re-raise error if failed
logging . info ( " Gracefully stopped " )
class SegmentLinker ( SegmentGetter ) :
def __init__ ( self , parent_logger , base_dir , channel , segment , date ) :
self . logger = parent_logger . getChild ( " SegmentLinker@ {:x} " . format ( id ( self ) ) )
self . base_dir = base_dir
self . channel = channel
self . segment = segment
self . date = date
self . prefix = self . make_path_prefix ( )
self . retry = None
self . done = gevent . event . Event ( )
# Our parent's connection pool, but we'll replace it if there's any issues
def make_path_prefix ( self ) :
return os . path . join (
self . base_dir ,
' local ' ,
' source ' ,
self . date . strftime ( " % Y- % m- %d T % H " ) ,
" {date} - {duration} " . format (
date = self . date . strftime ( " % M: % S. %f " ) ,
duration = self . segment . duration ,
) ,
)
def make_path ( self , hash = None ) :
""" Generate filepath for the segment.
"""
arg = b64encode ( hash . digest ( ) , b " -_ " ) . decode ( ) . rstrip ( " = " ) if hash else str ( uuid . uuid4 ( ) )
return " {} -full- {} .ts " . format ( self . prefix , arg )
def _get_segment ( self ) :
hash = hashlib . sha256 ( )
with open ( self . segment . uri , ' rb ' ) as f :
while chunk := f . read ( 8192 ) :
hash . update ( chunk )
dest_path = self . make_path ( hash )
common . ensure_directory ( dest_path )
self . logger . debug ( " Linking segment: {} to {} " . format ( dest_path , self . segment . uri ) )
os . symlink ( self . segment . uri , dest_path )
segments_downloaded . labels ( type = " full " , channel = self . channel , quality = " source " ) . inc ( )
segment_duration_downloaded . labels ( type = " full " , channel = self . channel , quality = " source " ) . inc ( self . segment . duration )
stat = latest_segment . labels ( channel = self . channel , quality = " source " )
timestamp = ( self . date - datetime . datetime ( 1970 , 1 , 1 ) ) . total_seconds ( )
stat . set ( max ( stat . _value . get ( ) , timestamp ) )