|
|
|
@ -21,7 +21,7 @@ import common
|
|
|
|
|
import common.dateutil
|
|
|
|
|
import common.requests
|
|
|
|
|
|
|
|
|
|
from .providers import URLProvider, TwitchProvider, YoutubeProvider
|
|
|
|
|
from .providers import URLProvider, TwitchProvider, YoutubeProvider, LocalProvider
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
segments_downloaded = prom.Counter(
|
|
|
|
@ -384,16 +384,25 @@ class StreamWorker(object):
|
|
|
|
|
if segment.uri not in self.getters:
|
|
|
|
|
if date is None:
|
|
|
|
|
raise ValueError("Cannot determine date of segment")
|
|
|
|
|
self.getters[segment.uri] = SegmentGetter(
|
|
|
|
|
self.logger,
|
|
|
|
|
self.session,
|
|
|
|
|
self.manager.base_dir,
|
|
|
|
|
self.manager.channel,
|
|
|
|
|
self.quality,
|
|
|
|
|
segment,
|
|
|
|
|
date,
|
|
|
|
|
self.map_cache,
|
|
|
|
|
)
|
|
|
|
|
if isinstance(self.manager.provider, LocalProvider):
|
|
|
|
|
self.getters[segment.uri] = SegmentLinker(
|
|
|
|
|
self.logger,
|
|
|
|
|
self.manager.base_dir,
|
|
|
|
|
self.manager.channel,
|
|
|
|
|
segment,
|
|
|
|
|
date,
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
self.getters[segment.uri] = SegmentGetter(
|
|
|
|
|
self.logger,
|
|
|
|
|
self.session,
|
|
|
|
|
self.manager.base_dir,
|
|
|
|
|
self.manager.channel,
|
|
|
|
|
self.quality,
|
|
|
|
|
segment,
|
|
|
|
|
date,
|
|
|
|
|
self.map_cache,
|
|
|
|
|
)
|
|
|
|
|
gevent.spawn(self.getters[segment.uri].run)
|
|
|
|
|
if date is not None:
|
|
|
|
|
date += datetime.timedelta(seconds=segment.duration)
|
|
|
|
@ -649,6 +658,9 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
|
|
|
|
|
elif type == "youtube":
|
|
|
|
|
provider = YoutubeProvider(url)
|
|
|
|
|
channel_qualities = ["source"]
|
|
|
|
|
elif type == "local":
|
|
|
|
|
provider = LocalProvider(url)
|
|
|
|
|
channel_qualities = ["source"]
|
|
|
|
|
else:
|
|
|
|
|
raise ValueError(f"Unknown type {type!r}")
|
|
|
|
|
manager = StreamsManager(provider, channel, base_dir, channel_qualities, important=important)
|
|
|
|
@ -687,3 +699,69 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
|
|
|
|
|
worker.get() # re-raise error if failed
|
|
|
|
|
|
|
|
|
|
logging.info("Gracefully stopped")
|
|
|
|
|
|
|
|
|
|
class SegmentLinker(SegmentGetter):
|
|
|
|
|
def __init__(self, parent_logger, base_dir, channel, segment, date):
|
|
|
|
|
self.logger = parent_logger.getChild("SegmentLinker@{:x}".format(id(self)))
|
|
|
|
|
self.base_dir = base_dir
|
|
|
|
|
self.channel = channel
|
|
|
|
|
self.segment = segment
|
|
|
|
|
self.date = date
|
|
|
|
|
self.prefix = self.make_path_prefix()
|
|
|
|
|
self.retry = None
|
|
|
|
|
self.done = gevent.event.Event()
|
|
|
|
|
# Our parent's connection pool, but we'll replace it if there's any issues
|
|
|
|
|
|
|
|
|
|
def exists(self):
|
|
|
|
|
"""Look for an existing link to the segment. Return bool."""
|
|
|
|
|
dirname = os.path.dirname(self.prefix)
|
|
|
|
|
try:
|
|
|
|
|
candidates = os.listdir(dirname)
|
|
|
|
|
except OSError as e:
|
|
|
|
|
# on ENOENT (doesn't exist), return false
|
|
|
|
|
if e.errno != errno.ENOENT:
|
|
|
|
|
raise
|
|
|
|
|
return False
|
|
|
|
|
full_prefix = "{}-full".format(self.prefix)
|
|
|
|
|
return any(
|
|
|
|
|
os.path.join(dirname, candidate).startswith(full_prefix)
|
|
|
|
|
# There's almost no way a matching tombstone could already exist, but just in case
|
|
|
|
|
# we'll make sure it isn't counted.
|
|
|
|
|
and not candidate.endswith(".tombstone")
|
|
|
|
|
for candidate in candidates
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def make_path_prefix(self):
|
|
|
|
|
return os.path.join(
|
|
|
|
|
self.base_dir,
|
|
|
|
|
'local',
|
|
|
|
|
'source',
|
|
|
|
|
self.date.strftime("%Y-%m-%dT%H"),
|
|
|
|
|
"{date}-{duration}".format(
|
|
|
|
|
date=self.date.strftime("%M:%S.%f"),
|
|
|
|
|
duration=self.segment.duration,
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def make_path(self, hash=None):
|
|
|
|
|
"""Generate filepath for the segment.
|
|
|
|
|
"""
|
|
|
|
|
arg = b64encode(hash.digest(), b"-_").decode().rstrip("=") if hash else str(uuid.uuid4())
|
|
|
|
|
return "{}-full-{}.ts".format(self.prefix, arg)
|
|
|
|
|
|
|
|
|
|
def _get_segment(self):
|
|
|
|
|
hash = hashlib.sha256()
|
|
|
|
|
with open(self.segment.uri, 'rb') as f:
|
|
|
|
|
while chunk := f.read(8192):
|
|
|
|
|
hash.update(chunk)
|
|
|
|
|
|
|
|
|
|
dest_path = self.make_path(hash)
|
|
|
|
|
common.ensure_directory(dest_path)
|
|
|
|
|
self.logger.debug("Linking segment: {} to {}".format(dest_path, self.segment.uri))
|
|
|
|
|
os.symlink(self.segment.uri, dest_path)
|
|
|
|
|
|
|
|
|
|
segments_downloaded.labels(type="full", channel=self.channel, quality="source").inc()
|
|
|
|
|
segment_duration_downloaded.labels(type="full", channel=self.channel, quality="source").inc(self.segment.duration)
|
|
|
|
|
stat = latest_segment.labels(channel=self.channel, quality="source")
|
|
|
|
|
timestamp = (self.date - datetime.datetime(1970, 1, 1)).total_seconds()
|
|
|
|
|
stat.set(max(stat._value.get(), timestamp))
|
|
|
|
|