|
|
|
@ -90,6 +90,7 @@ class StreamsManager(object):
|
|
|
|
|
|
|
|
|
|
def __init__(self, channel, base_dir, qualities):
|
|
|
|
|
self.channel = channel
|
|
|
|
|
self.logger = logging.getLogger("StreamsManager({})".format(channel))
|
|
|
|
|
self.base_dir = base_dir
|
|
|
|
|
self.stream_workers = {name: [] for name in qualities + ["source"]} # {stream name: [workers]}
|
|
|
|
|
self.latest_urls = {} # {stream name: (fetch time, url)}
|
|
|
|
@ -102,7 +103,7 @@ class StreamsManager(object):
|
|
|
|
|
and any older workers are safe to stop."""
|
|
|
|
|
workers = self.stream_workers[worker.stream]
|
|
|
|
|
if worker not in workers:
|
|
|
|
|
logging.warning("Worker {} called mark_working() but wasn't in known list: {}".format(worker, workers))
|
|
|
|
|
self.logger.warning("Worker {} called mark_working() but wasn't in known list: {}".format(worker, workers))
|
|
|
|
|
return
|
|
|
|
|
# stop everything older than given worker
|
|
|
|
|
for old in workers[:workers.index(worker)]:
|
|
|
|
@ -116,12 +117,12 @@ class StreamsManager(object):
|
|
|
|
|
"""
|
|
|
|
|
workers = self.stream_workers[worker.stream]
|
|
|
|
|
if worker not in workers:
|
|
|
|
|
logging.warning("Worker {} called trigger_new_worker() but wasn't in known list: {}".format(worker, workers))
|
|
|
|
|
self.logger.warning("Worker {} called trigger_new_worker() but wasn't in known list: {}".format(worker, workers))
|
|
|
|
|
return
|
|
|
|
|
if worker is not workers[-1]:
|
|
|
|
|
logging.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.stream))
|
|
|
|
|
self.logger.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.stream))
|
|
|
|
|
return
|
|
|
|
|
logging.info("Starting new worker for {} by request of old worker".format(worker.stream))
|
|
|
|
|
self.logger.info("Starting new worker for {} by request of old worker".format(worker.stream))
|
|
|
|
|
self.wait_for_new_url(worker.stream, worker.url)
|
|
|
|
|
self.start_worker(worker.stream)
|
|
|
|
|
self.trigger_refresh()
|
|
|
|
@ -146,13 +147,13 @@ class StreamsManager(object):
|
|
|
|
|
new_time, new_url = self.latest_urls[stream]
|
|
|
|
|
if new_url != old_url:
|
|
|
|
|
return
|
|
|
|
|
logging.info("Triggering master playlist refresh as we need a new url")
|
|
|
|
|
self.logger.info("Triggering master playlist refresh as we need a new url")
|
|
|
|
|
self.trigger_refresh()
|
|
|
|
|
self.latest_urls_changed.wait()
|
|
|
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
|
"""Shut down all workers and stop capturing stream."""
|
|
|
|
|
logging.info("Stopping streams manager")
|
|
|
|
|
self.logger.info("Stopping")
|
|
|
|
|
self.stopping.set()
|
|
|
|
|
|
|
|
|
|
def start_worker(self, stream):
|
|
|
|
@ -160,7 +161,7 @@ class StreamsManager(object):
|
|
|
|
|
# it's possible for fetch_latest to call us after we've started stopping,
|
|
|
|
|
# in that case do nothing.
|
|
|
|
|
if self.stopping.is_set():
|
|
|
|
|
logging.info("Ignoring worker start as we are stopping")
|
|
|
|
|
self.logger.info("Ignoring worker start as we are stopping")
|
|
|
|
|
return
|
|
|
|
|
url_time, url = self.latest_urls[stream]
|
|
|
|
|
worker = StreamWorker(self, stream, url, url_time)
|
|
|
|
@ -171,7 +172,7 @@ class StreamsManager(object):
|
|
|
|
|
"""Re-fetch master playlist and start new workers if needed"""
|
|
|
|
|
try:
|
|
|
|
|
# Fetch playlist. On soft timeout, retry.
|
|
|
|
|
logging.info("Fetching master playlist")
|
|
|
|
|
self.logger.info("Fetching master playlist")
|
|
|
|
|
fetch_time = monotonic()
|
|
|
|
|
with soft_hard_timeout("fetching master playlist", self.FETCH_TIMEOUTS, self.trigger_refresh):
|
|
|
|
|
master_playlist = twitch.get_master_playlist(self.channel)
|
|
|
|
@ -180,23 +181,23 @@ class StreamsManager(object):
|
|
|
|
|
for stream, workers in self.stream_workers.items():
|
|
|
|
|
# warn and retry if the url is missing
|
|
|
|
|
if stream not in new_urls:
|
|
|
|
|
logging.warning("Stream {} could not be found in latest master playlist, re-queueing refresh".format(stream))
|
|
|
|
|
self.logger.warning("Stream {} could not be found in latest master playlist, re-queueing refresh".format(stream))
|
|
|
|
|
self.trigger_refresh()
|
|
|
|
|
# is it newly found?
|
|
|
|
|
if not workers and stream in self.latest_urls:
|
|
|
|
|
logging.info("Starting new worker for {} as none exist".format(stream))
|
|
|
|
|
self.logger.info("Starting new worker for {} as none exist".format(stream))
|
|
|
|
|
self.start_worker(stream)
|
|
|
|
|
latest_worker = workers[-1]
|
|
|
|
|
# is the old worker too old?
|
|
|
|
|
if latest_worker.age() > self.MAX_WORKER_AGE:
|
|
|
|
|
logging.info("Starting new worker for {} as the latest is too old ({}h)".format(stream, latest_worker.age() / 3600.))
|
|
|
|
|
self.logger.info("Starting new worker for {} as the latest is too old ({}h)".format(stream, latest_worker.age() / 3600.))
|
|
|
|
|
self.start_worker(stream)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
if isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 404:
|
|
|
|
|
logging.info("Stream is not up. Retrying.")
|
|
|
|
|
self.logger.info("Stream is not up. Retrying.")
|
|
|
|
|
self.trigger_refresh()
|
|
|
|
|
else:
|
|
|
|
|
logging.exception("Failed to fetch master playlist")
|
|
|
|
|
self.logger.exception("Failed to fetch master playlist")
|
|
|
|
|
# don't retry on hard timeout as we already retried on soft timeout
|
|
|
|
|
if not isinstance(e, TimedOutError):
|
|
|
|
|
self.trigger_refresh()
|
|
|
|
@ -209,7 +210,7 @@ class StreamsManager(object):
|
|
|
|
|
self.MAX_WORKER_AGE - workers[-1].age()
|
|
|
|
|
for workers in self.stream_workers.values() if workers
|
|
|
|
|
] or [0]))
|
|
|
|
|
logging.info("Next master playlist refresh in at most {} sec".format(time_to_next_max_age))
|
|
|
|
|
self.logger.info("Next master playlist refresh in at most {} sec".format(time_to_next_max_age))
|
|
|
|
|
# wait until refresh triggered, next max age reached, or we're stopping (whichever happens first)
|
|
|
|
|
gevent.wait([self.stopping, self.refresh_needed], timeout=time_to_next_max_age, count=1)
|
|
|
|
|
if not self.stopping.is_set():
|
|
|
|
@ -217,7 +218,7 @@ class StreamsManager(object):
|
|
|
|
|
gevent.spawn(self.fetch_latest)
|
|
|
|
|
# wait min retry interval with jitter, unless we're stopping
|
|
|
|
|
self.stopping.wait(common.jitter(self.FETCH_MIN_INTERVAL))
|
|
|
|
|
logging.info("Stopping workers")
|
|
|
|
|
self.logger.info("Stopping workers")
|
|
|
|
|
for workers in self.stream_workers.values():
|
|
|
|
|
for worker in workers:
|
|
|
|
|
worker.stop()
|
|
|
|
@ -248,6 +249,7 @@ class StreamWorker(object):
|
|
|
|
|
|
|
|
|
|
def __init__(self, manager, stream, url, url_time):
|
|
|
|
|
self.manager = manager
|
|
|
|
|
self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(stream, id(self)))
|
|
|
|
|
self.stream = stream
|
|
|
|
|
self.url = url
|
|
|
|
|
self.url_time = url_time
|
|
|
|
@ -268,14 +270,14 @@ class StreamWorker(object):
|
|
|
|
|
self.stopping.set()
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
logging.info("Worker {} starting".format(self))
|
|
|
|
|
self.logger.info("Worker starting")
|
|
|
|
|
try:
|
|
|
|
|
self._run()
|
|
|
|
|
except Exception:
|
|
|
|
|
logging.exception("Worker {} failed".format(self))
|
|
|
|
|
self.logger.exception("Worker failed")
|
|
|
|
|
self.trigger_new_worker()
|
|
|
|
|
else:
|
|
|
|
|
logging.info("Worker {} stopped".format(self))
|
|
|
|
|
self.logger.info("Worker stopped")
|
|
|
|
|
finally:
|
|
|
|
|
for getter in self.getters.values():
|
|
|
|
|
getter.done.wait()
|
|
|
|
@ -293,18 +295,18 @@ class StreamWorker(object):
|
|
|
|
|
first = True
|
|
|
|
|
while not self.stopping.is_set():
|
|
|
|
|
|
|
|
|
|
logging.debug("{} getting media playlist {}".format(self, self.url))
|
|
|
|
|
self.logger.debug("Getting media playlist {}".format(self.url))
|
|
|
|
|
try:
|
|
|
|
|
with soft_hard_timeout("getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker):
|
|
|
|
|
playlist = twitch.get_media_playlist(self.url)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logging.warning("{} failed to fetch media playlist {}".format(self, self.url), exc_info=True)
|
|
|
|
|
self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True)
|
|
|
|
|
self.trigger_new_worker()
|
|
|
|
|
if first:
|
|
|
|
|
logging.warning("{} failed on first fetch, stopping".format(self))
|
|
|
|
|
self.logger.warning("Failed on first fetch, stopping")
|
|
|
|
|
self.stop()
|
|
|
|
|
elif isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 403:
|
|
|
|
|
logging.warning("{} failed with 403 Forbidden, stopping".format(self))
|
|
|
|
|
self.logger.warning("Failed with 403 Forbidden, stopping")
|
|
|
|
|
self.stop()
|
|
|
|
|
self.wait(self.FETCH_RETRY_INTERVAL)
|
|
|
|
|
continue
|
|
|
|
@ -320,7 +322,7 @@ class StreamWorker(object):
|
|
|
|
|
if segment.uri not in self.getters:
|
|
|
|
|
if date is None:
|
|
|
|
|
raise ValueError("Cannot determine date of segment")
|
|
|
|
|
self.getters[segment.uri] = SegmentGetter(self.manager.base_dir, self.manager.channel, self.stream, segment, date)
|
|
|
|
|
self.getters[segment.uri] = SegmentGetter(self.logger, self.manager.base_dir, self.manager.channel, self.stream, segment, date)
|
|
|
|
|
gevent.spawn(self.getters[segment.uri].run)
|
|
|
|
|
if date is not None:
|
|
|
|
|
date += datetime.timedelta(seconds=segment.duration)
|
|
|
|
@ -335,7 +337,7 @@ class StreamWorker(object):
|
|
|
|
|
|
|
|
|
|
# Stop if end-of-stream
|
|
|
|
|
if playlist.is_endlist:
|
|
|
|
|
logging.info("{} stopping due to end-of-playlist".format(self))
|
|
|
|
|
self.logger.info("Stopping due to end-of-playlist")
|
|
|
|
|
# Trigger a new worker for when the stream comes back up.
|
|
|
|
|
# In the short term this will cause some thrashing until the master playlist
|
|
|
|
|
# starts returning 404, but it's the best way to avoid missing anything
|
|
|
|
@ -364,7 +366,8 @@ class SegmentGetter(object):
|
|
|
|
|
FETCH_HEADERS_TIMEOUTS = 5, 30
|
|
|
|
|
FETCH_FULL_TIMEOUTS = 15, 240
|
|
|
|
|
|
|
|
|
|
def __init__(self, base_dir, channel, stream, segment, date):
|
|
|
|
|
def __init__(self, parent_logger, base_dir, channel, stream, segment, date):
|
|
|
|
|
self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self)))
|
|
|
|
|
self.base_dir = base_dir
|
|
|
|
|
self.channel = channel
|
|
|
|
|
self.stream = stream
|
|
|
|
@ -380,7 +383,7 @@ class SegmentGetter(object):
|
|
|
|
|
try:
|
|
|
|
|
self._run()
|
|
|
|
|
except Exception:
|
|
|
|
|
logging.exception("Failure in SegmentGetter {}".format(self.segment))
|
|
|
|
|
self.logger.exception("Unexpected exception while getting segment {}, retrying".format(self.segment))
|
|
|
|
|
gevent.sleep(common.jitter(self.UNEXPECTED_FAILURE_RETRY))
|
|
|
|
|
else:
|
|
|
|
|
break
|
|
|
|
@ -388,6 +391,7 @@ class SegmentGetter(object):
|
|
|
|
|
self.done.set()
|
|
|
|
|
|
|
|
|
|
def _run(self):
|
|
|
|
|
self.logger.debug("Getter started")
|
|
|
|
|
while not self.exists():
|
|
|
|
|
self.retry = gevent.event.Event()
|
|
|
|
|
worker = gevent.spawn(self.get_segment)
|
|
|
|
@ -398,6 +402,7 @@ class SegmentGetter(object):
|
|
|
|
|
break
|
|
|
|
|
# if retry not set, wait for FETCH_RETRY first
|
|
|
|
|
self.retry.wait(common.jitter(self.FETCH_RETRY))
|
|
|
|
|
self.logger.debug("Getter is done")
|
|
|
|
|
|
|
|
|
|
def make_path_prefix(self):
|
|
|
|
|
"""Generate leading part of filepath which doesn't change with the hash."""
|
|
|
|
@ -437,8 +442,6 @@ class SegmentGetter(object):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_segment(self):
|
|
|
|
|
# save current value of self.retry so we can't set any later instance
|
|
|
|
|
# after a retry for this round has already occurred.
|
|
|
|
|
try:
|
|
|
|
|
self._get_segment()
|
|
|
|
|
except Exception:
|
|
|
|
@ -448,13 +451,16 @@ class SegmentGetter(object):
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def _get_segment(self):
|
|
|
|
|
# save current value of self.retry so we can't set any later instance
|
|
|
|
|
# after a retry for this round has already occurred.
|
|
|
|
|
retry = self.retry
|
|
|
|
|
temp_path = self.make_path("temp")
|
|
|
|
|
hash = hashlib.sha256()
|
|
|
|
|
file_created = False
|
|
|
|
|
try:
|
|
|
|
|
logging.debug("Getting segment {}".format(self.segment))
|
|
|
|
|
with soft_hard_timeout("getting and writing segment", self.FETCH_FULL_TIMEOUTS, self.retry.set):
|
|
|
|
|
with soft_hard_timeout("getting segment headers", self.FETCH_HEADERS_TIMEOUTS, self.retry.set):
|
|
|
|
|
logging.debug("Downloading segment {} to {}".format(self.segment, temp_path))
|
|
|
|
|
with soft_hard_timeout("getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set):
|
|
|
|
|
with soft_hard_timeout("getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set):
|
|
|
|
|
resp = requests.get(self.segment.uri, stream=True)
|
|
|
|
|
if resp.status_code == 403:
|
|
|
|
|
logging.warning("Got 403 Forbidden for segment, giving up: {}".format(self.segment))
|
|
|
|
@ -474,12 +480,14 @@ class SegmentGetter(object):
|
|
|
|
|
# another exception in the interim
|
|
|
|
|
ex_type, ex, tb = sys.exc_info()
|
|
|
|
|
if file_created:
|
|
|
|
|
common.rename(temp_path, self.make_path("partial", hash))
|
|
|
|
|
partial_path = self.make_path("partial", hash)
|
|
|
|
|
self.logger.warning("Saving partial segment {} as {}".format(temp_path, partial_path))
|
|
|
|
|
common.rename(temp_path, partial_path)
|
|
|
|
|
raise ex_type, ex, tb
|
|
|
|
|
else:
|
|
|
|
|
common.rename(temp_path, self.make_path("full", hash))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
full_path = self.make_path("full", hash)
|
|
|
|
|
self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path))
|
|
|
|
|
common.rename(temp_path, full_path)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(channel, base_dir=".", qualities=""):
|
|
|
|
|