From a6d4f2466a0c6018f8881bd24e36923ff0bb43a8 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Sun, 26 Nov 2023 14:48:16 +1100 Subject: [PATCH] restreamer: coalesce concurrent requests for the same playlist URL --- restreamer/restreamer/main.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index 3f1500f..8df0b5f 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -10,6 +10,7 @@ from uuid import uuid4 import gevent import gevent.backdoor +import gevent.event import prometheus_client as prom from flask import Flask, url_for, request, abort, Response from gevent.pywsgi import WSGIServer @@ -232,6 +233,14 @@ def generate_master_playlist(channel): return generate_hls.generate_master(playlists) +# Generating large media playlists is expensive, especially on the first run +# where the cache is cold. And the video player will make repeated requests. +# To avoid requests piling up and repeating work, if we get the exact same request again +# while the old request is in progress, we piggyback on the previous request and return +# the same result. +# This cache object maps (hour_path, start, end) to an AsyncResult. +_media_playlist_cache = {} + @app.route('/playlist//.m3u8') @request_stats @has_path_args @@ -270,7 +279,22 @@ def generate_media_playlist(channel, quality): # (not an error because someone might ask for a specific start, no end, but we ended up with # end before start because that's the latest time we have) if start < end: - segments = get_best_segments(hours_path, start, end) + cache_key = (hours_path, start, end) + if cache_key in _media_playlist_cache: + segments = _media_playlist_cache[cache_key].get() + else: + result = gevent.event.AsyncResult() + try: + # Note we don't populate the cache until we're in the try block, + # so there is no point where an exception won't be transferred to the result. + _media_playlist_cache[cache_key] = result + segments = list(get_best_segments(hours_path, start, end)) + result.set(segments) + except BaseException as ex: + result.set_exception(ex) + raise + # Now we're done, remove the async result so a fresh request can start. + assert _media_playlist_cache.pop(cache_key) is result, "Got someone else's AsyncResult" else: # Note the None to indicate there was a "hole" at both start and end segments = [None]