restreamer: coalesce concurrent requests for the same playlist URL

pull/389/head
Mike Lang 1 year ago committed by Mike Lang
parent 28a468477d
commit a6d4f2466a

@ -10,6 +10,7 @@ from uuid import uuid4
import gevent import gevent
import gevent.backdoor import gevent.backdoor
import gevent.event
import prometheus_client as prom import prometheus_client as prom
from flask import Flask, url_for, request, abort, Response from flask import Flask, url_for, request, abort, Response
from gevent.pywsgi import WSGIServer from gevent.pywsgi import WSGIServer
@ -232,6 +233,14 @@ def generate_master_playlist(channel):
return generate_hls.generate_master(playlists) return generate_hls.generate_master(playlists)
# Generating large media playlists is expensive, especially on the first run
# where the cache is cold. And the video player will make repeated requests.
# To avoid requests piling up and repeating work, if we get the exact same request again
# while the old request is in progress, we piggyback on the previous request and return
# the same result.
# This cache object maps (hour_path, start, end) to an AsyncResult.
_media_playlist_cache = {}
@app.route('/playlist/<channel>/<quality>.m3u8') @app.route('/playlist/<channel>/<quality>.m3u8')
@request_stats @request_stats
@has_path_args @has_path_args
@ -270,7 +279,22 @@ def generate_media_playlist(channel, quality):
# (not an error because someone might ask for a specific start, no end, but we ended up with # (not an error because someone might ask for a specific start, no end, but we ended up with
# end before start because that's the latest time we have) # end before start because that's the latest time we have)
if start < end: if start < end:
segments = get_best_segments(hours_path, start, end) cache_key = (hours_path, start, end)
if cache_key in _media_playlist_cache:
segments = _media_playlist_cache[cache_key].get()
else:
result = gevent.event.AsyncResult()
try:
# Note we don't populate the cache until we're in the try block,
# so there is no point where an exception won't be transferred to the result.
_media_playlist_cache[cache_key] = result
segments = list(get_best_segments(hours_path, start, end))
result.set(segments)
except BaseException as ex:
result.set_exception(ex)
raise
# Now we're done, remove the async result so a fresh request can start.
assert _media_playlist_cache.pop(cache_key) is result, "Got someone else's AsyncResult"
else: else:
# Note the None to indicate there was a "hole" at both start and end # Note the None to indicate there was a "hole" at both start and end
segments = [None] segments = [None]

Loading…
Cancel
Save