fixes based on ekimekims suggestions

pull/67/head
Christopher Usher 5 years ago
parent 732c56d502
commit 361e577474

@ -91,7 +91,7 @@ def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment,
try: try:
logging.debug('Fetching segment {} from {}'.format(path, node)) logging.debug('Fetching segment {} from {}'.format(path, node))
uri = '{}/segments/{}/{}/{}/{}'.format(node, channel, quality, hour, missing_segment) uri = '{}/segments/{}/{}/{}/{}'.format(node, channel, quality, hour, missing_segment)
resp = requests.get(uri, channel=True, timeout=timeout) resp = requests.get(uri, stream=True, timeout=timeout)
resp.raise_for_status() resp.raise_for_status()

@ -77,12 +77,12 @@ def soft_hard_timeout(logger, description, (soft_timeout, hard_timeout), on_soft
class StreamsManager(object): class StreamsManager(object):
"""Keeps track of what streams are being downloaded and the workers doing so. """Keeps track of what qualities are being downloaded and the workers doing so.
Re-fetches master playlist when needed and starts new stream workers. Re-fetches master playlist when needed and starts new stream workers.
This is all to ensure that broken or bad media playlist urls are refreshed This is all to ensure that broken or bad media playlist urls are refreshed
in a timely manner. in a timely manner.
The stream_workers map lists workers for each stream. Generally there should only be The stream_workers map lists workers for each quality. Generally there should only be
one, but during switchover there may be 2 - one old one continuing to (try to) operate while one, but during switchover there may be 2 - one old one continuing to (try to) operate while
the second one confirms it's working. While trying to get a url working, it won't retry, it'll the second one confirms it's working. While trying to get a url working, it won't retry, it'll
just ask the manager immediately to create yet another new worker then quit. just ask the manager immediately to create yet another new worker then quit.
@ -94,11 +94,11 @@ class StreamsManager(object):
Creation of a new stream worker may be triggered by: Creation of a new stream worker may be triggered by:
* An existing worker failing to refresh its playlist * An existing worker failing to refresh its playlist
* The master playlist finding a stream url it doesn't have already * The master playlist finding a quality url it doesn't have already
* A worker is older than MAX_WORKER_AGE * A worker is older than MAX_WORKER_AGE
The master playlist is only re-fetched as needed. Reasons to re-fetch: The master playlist is only re-fetched as needed. Reasons to re-fetch:
* No url is known for at least one stream * No url is known for at least one quality
* Someone has requested it (eg. because the previous attempt failed, or a worker needs a new url) * Someone has requested it (eg. because the previous attempt failed, or a worker needs a new url)
* A worker is older than MAX_WORKER_AGE * A worker is older than MAX_WORKER_AGE
""" """
@ -111,8 +111,8 @@ class StreamsManager(object):
self.channel = channel self.channel = channel
self.logger = logging.getLogger("StreamsManager({})".format(channel)) self.logger = logging.getLogger("StreamsManager({})".format(channel))
self.base_dir = base_dir self.base_dir = base_dir
self.stream_workers = {name: [] for name in qualities} # {stream name: [workers]} self.stream_workers = {name: [] for name in qualities} # {quality name: [workers]}
self.latest_urls = {} # {stream name: (fetch time, url)} self.latest_urls = {} # {quality name: (fetch time, url)}
self.latest_urls_changed = gevent.event.Event() # set when latest_urls changes self.latest_urls_changed = gevent.event.Event() # set when latest_urls changes
self.refresh_needed = gevent.event.Event() # set to tell main loop to refresh now self.refresh_needed = gevent.event.Event() # set to tell main loop to refresh now
self.stopping = gevent.event.Event() # set to tell main loop to stop self.stopping = gevent.event.Event() # set to tell main loop to stop
@ -120,7 +120,7 @@ class StreamsManager(object):
def mark_working(self, worker): def mark_working(self, worker):
"""Notify the manager that the given worker is up and running, """Notify the manager that the given worker is up and running,
and any older workers are safe to stop.""" and any older workers are safe to stop."""
workers = self.stream_workers[worker.stream] workers = self.stream_workers[worker.quality]
if worker not in workers: if worker not in workers:
self.logger.warning("Worker {} called mark_working() but wasn't in known list: {}".format(worker, workers)) self.logger.warning("Worker {} called mark_working() but wasn't in known list: {}".format(worker, workers))
return return
@ -129,21 +129,21 @@ class StreamsManager(object):
old.stop() old.stop()
def trigger_new_worker(self, worker): def trigger_new_worker(self, worker):
"""Called when a worker decides a new worker for a stream is needed, eg. if it seems to be """Called when a worker decides a new worker for a quality is needed, eg. if it seems to be
failing. Causes a new worker with a fresh url to be created. failing. Causes a new worker with a fresh url to be created.
If worker's url is the same as the latest url, blocks until a new url has been fetched. If worker's url is the same as the latest url, blocks until a new url has been fetched.
This only has effect if the worker is the current latest worker. This only has effect if the worker is the current latest worker.
""" """
workers = self.stream_workers[worker.stream] workers = self.stream_workers[worker.quality]
if worker not in workers: if worker not in workers:
self.logger.warning("Worker {} called trigger_new_worker() but wasn't in known list: {}".format(worker, workers)) self.logger.warning("Worker {} called trigger_new_worker() but wasn't in known list: {}".format(worker, workers))
return return
if worker is not workers[-1]: if worker is not workers[-1]:
self.logger.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.stream)) self.logger.info("Ignoring request to start new worker for {} as old one is not latest".format(worker.quality))
return return
self.logger.info("Starting new worker for {} by request of old worker".format(worker.stream)) self.logger.info("Starting new worker for {} by request of old worker".format(worker.quality))
self.wait_for_new_url(worker.stream, worker.url) self.wait_for_new_url(worker.quality, worker.url)
self.start_worker(worker.stream) self.start_worker(worker.quality)
self.trigger_refresh() self.trigger_refresh()
def trigger_refresh(self): def trigger_refresh(self):
@ -160,10 +160,10 @@ class StreamsManager(object):
self.latest_urls_changed.set() self.latest_urls_changed.set()
self.latest_urls_changed = gevent.event.Event() self.latest_urls_changed = gevent.event.Event()
def wait_for_new_url(self, stream, old_url): def wait_for_new_url(self, quality, old_url):
"""Trigger urls to be re-fetched, and block until a different one is received.""" """Trigger urls to be re-fetched, and block until a different one is received."""
while True: while True:
new_time, new_url = self.latest_urls[stream] new_time, new_url = self.latest_urls[quality]
if new_url != old_url: if new_url != old_url:
return return
self.logger.info("Triggering master playlist refresh as we need a new url") self.logger.info("Triggering master playlist refresh as we need a new url")
@ -175,16 +175,16 @@ class StreamsManager(object):
self.logger.info("Stopping") self.logger.info("Stopping")
self.stopping.set() self.stopping.set()
def start_worker(self, stream): def start_worker(self, quality):
"""Start a new worker for given stream""" """Start a new worker for given quality"""
# it's possible for fetch_latest to call us after we've started stopping, # it's possible for fetch_latest to call us after we've started stopping,
# in that case do nothing. # in that case do nothing.
if self.stopping.is_set(): if self.stopping.is_set():
self.logger.info("Ignoring worker start as we are stopping") self.logger.info("Ignoring worker start as we are stopping")
return return
url_time, url = self.latest_urls[stream] url_time, url = self.latest_urls[quality]
worker = StreamWorker(self, stream, url, url_time) worker = StreamWorker(self, quality, url, url_time)
self.stream_workers[stream].append(worker) self.stream_workers[quality].append(worker)
gevent.spawn(worker.run) gevent.spawn(worker.run)
def fetch_latest(self): def fetch_latest(self):
@ -197,20 +197,20 @@ class StreamsManager(object):
master_playlist = twitch.get_master_playlist(self.channel) master_playlist = twitch.get_master_playlist(self.channel)
new_urls = twitch.get_media_playlist_uris(master_playlist, self.stream_workers.keys()) new_urls = twitch.get_media_playlist_uris(master_playlist, self.stream_workers.keys())
self.update_urls(fetch_time, new_urls) self.update_urls(fetch_time, new_urls)
for stream, workers in self.stream_workers.items(): for quality, workers in self.stream_workers.items():
# warn and retry if the url is missing # warn and retry if the url is missing
if stream not in new_urls: if quality not in new_urls:
self.logger.warning("Stream {} could not be found in latest master playlist, re-queueing refresh".format(stream)) self.logger.warning("Stream {} could not be found in latest master playlist, re-queueing refresh".format(quality))
self.trigger_refresh() self.trigger_refresh()
# is it newly found? # is it newly found?
if not workers and stream in self.latest_urls: if not workers and quality in self.latest_urls:
self.logger.info("Starting new worker for {} as none exist".format(stream)) self.logger.info("Starting new worker for {} as none exist".format(quality))
self.start_worker(stream) self.start_worker(quality)
latest_worker = workers[-1] latest_worker = workers[-1]
# is the old worker too old? # is the old worker too old?
if latest_worker.age() > self.MAX_WORKER_AGE: if latest_worker.age() > self.MAX_WORKER_AGE:
self.logger.info("Starting new worker for {} as the latest is too old ({}h)".format(stream, latest_worker.age() / 3600.)) self.logger.info("Starting new worker for {} as the latest is too old ({}h)".format(quality, latest_worker.age() / 3600.))
self.start_worker(stream) self.start_worker(quality)
except Exception as e: except Exception as e:
if isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 404: if isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 404:
self.logger.info("Stream is not up. Retrying.") self.logger.info("Stream is not up. Retrying.")
@ -266,10 +266,10 @@ class StreamWorker(object):
FETCH_RETRY_INTERVAL = 1 FETCH_RETRY_INTERVAL = 1
FETCH_POLL_INTERVAL = 2 FETCH_POLL_INTERVAL = 2
def __init__(self, manager, stream, url, url_time): def __init__(self, manager, quality, url, url_time):
self.manager = manager self.manager = manager
self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(stream, id(self))) self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(quality, id(self)))
self.stream = stream self.quality = quality
self.url = url self.url = url
self.url_time = url_time self.url_time = url_time
self.stopping = gevent.event.Event() # set to stop main loop self.stopping = gevent.event.Event() # set to stop main loop
@ -359,7 +359,7 @@ class StreamWorker(object):
self.session, self.session,
self.manager.base_dir, self.manager.base_dir,
self.manager.channel, self.manager.channel,
self.stream, self.quality,
segment, segment,
date, date,
) )
@ -411,11 +411,11 @@ class SegmentGetter(object):
# or so, to be paranoid we set it to considerably longer than that. # or so, to be paranoid we set it to considerably longer than that.
GIVE_UP_TIMEOUT = 20 * 60 GIVE_UP_TIMEOUT = 20 * 60
def __init__(self, parent_logger, session, base_dir, channel, stream, segment, date): def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date):
self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self)))
self.base_dir = base_dir self.base_dir = base_dir
self.channel = channel self.channel = channel
self.stream = stream self.quality = quality
self.segment = segment self.segment = segment
self.date = date self.date = date
self.prefix = self.make_path_prefix() self.prefix = self.make_path_prefix()
@ -466,7 +466,7 @@ class SegmentGetter(object):
return os.path.join( return os.path.join(
self.base_dir, self.base_dir,
self.channel, self.channel,
self.stream, self.quality,
self.date.strftime("%Y-%m-%dT%H"), self.date.strftime("%Y-%m-%dT%H"),
"{date}-{duration}".format( "{date}-{duration}".format(
date=self.date.strftime("%M:%S.%f"), date=self.date.strftime("%M:%S.%f"),
@ -541,16 +541,16 @@ class SegmentGetter(object):
partial_path = self.make_path("partial", hash) partial_path = self.make_path("partial", hash)
self.logger.warning("Saving partial segment {} as {}".format(temp_path, partial_path)) self.logger.warning("Saving partial segment {} as {}".format(temp_path, partial_path))
common.rename(temp_path, partial_path) common.rename(temp_path, partial_path)
segments_downloaded.labels(partial="True", channel=self.channel, quality=self.stream).inc() segments_downloaded.labels(partial="True", channel=self.channel, quality=self.quality).inc()
raise ex_type, ex, tb raise ex_type, ex, tb
else: else:
full_path = self.make_path("full", hash) full_path = self.make_path("full", hash)
self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path)) self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path))
common.rename(temp_path, full_path) common.rename(temp_path, full_path)
segments_downloaded.labels(partial="False", channel=self.channel, quality=self.stream).inc() segments_downloaded.labels(partial="False", channel=self.channel, quality=self.quality).inc()
# Prom doesn't provide a way to compare value to gauge's existing value, # Prom doesn't provide a way to compare value to gauge's existing value,
# we need to reach into internals # we need to reach into internals
stat = latest_segment.labels(channel=self.channel, quality=self.stream) stat = latest_segment.labels(channel=self.channel, quality=self.quality)
timestamp = (self.date - datetime.datetime(1970, 1, 1)).total_seconds() timestamp = (self.date - datetime.datetime(1970, 1, 1)).total_seconds()
stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe

@ -160,7 +160,7 @@ def time_range_for_quality(channel, quality):
def generate_master_playlist(channel): def generate_master_playlist(channel):
"""Returns a HLS master playlist for the given channel. """Returns a HLS master playlist for the given channel.
Takes optional params: Takes optional params:
start, end: The time to begin and end the channel at. start, end: The time to begin and end the stream at.
See generate_media_playlist for details. See generate_media_playlist for details.
""" """
start = common.dateutil.parse_utc_only(request.args['start']) if 'start' in request.args else None start = common.dateutil.parse_utc_only(request.args['start']) if 'start' in request.args else None
@ -190,7 +190,7 @@ def generate_master_playlist(channel):
def generate_media_playlist(channel, quality): def generate_media_playlist(channel, quality):
"""Returns a HLS media playlist for the given channel and quality. """Returns a HLS media playlist for the given channel and quality.
Takes optional params: Takes optional params:
start, end: The time to begin and end the channel at. start, end: The time to begin and end the stream at.
Must be in ISO 8601 format (ie. yyyy-mm-ddTHH:MM:SS) and UTC. Must be in ISO 8601 format (ie. yyyy-mm-ddTHH:MM:SS) and UTC.
If not given, effectively means "infinity", ie. no start means If not given, effectively means "infinity", ie. no start means
any time ago, no end means any time in the future. any time ago, no end means any time in the future.

Loading…
Cancel
Save