cache results of common.segments.best_segments_by_start

The restreamer spends most of its time iterating through segments (parsing them, determining the best one for each start time)
to serve large time ranges. Since this only depends on the list of filenames read from disk,
we can cache it for a given hour as long as that list is identical.

This is a little trickier than it sounds because best_segments_by_start is an iterator
and in most cases it won't be fully consumed. So we introduce a `CachedIterator` abstraction
that will both remember the previously yielded values, and keep track of the live iterator
so it can be resumed again if a previous invocation only partially consumed it.

This also has the nice side effect of merging simultaneous operations - if two requests come in
for the same hour at the same time, they'll share one iterator and both consume the results
as they come in.
pull/290/head
Mike Lang 3 years ago committed by Mike Lang
parent 871925aef5
commit 44d0c0269a

@ -0,0 +1,44 @@
import itertools
import gevent.lock
class CachedIterator():
"""Wraps an iterator. When you iterate over this, it pulls items from the wrapped iterator
as needed, but remembers each one. When you iterate over it again, it will re-serve the
yielded items in the same order, until it runs out, in which case it starts consuming
from the wrapped iterator again.
gevent-safe.
"""
def __init__(self, iterator):
self.iterator = iterator # Replaced with None once it's exhausted
self.items = []
self.lock = gevent.lock.RLock()
def __iter__(self):
# We use a loop index here because self.items may lengthen between loops
for i in itertools.count():
# are we beyond the end of the array?
if len(self.items) <= i:
# If we're more than 1 beyond the end, something has gone horribly wrong.
# We should've already lengthened it last iteration
assert len(self.items) == i, "CachedIterator logic error: {} != {}".format(len(self.items), i)
# Check if the iterator is still active. If not, we've reached the end.
if self.iterator is None:
return
# Note we don't need the lock up until now because we're only trying to be gevent-safe,
# not thread-safe. Simple operations like checking lengths can't be interrupted.
# However calling next on the iterator may cause a switch.
with self.lock:
try:
item = next(self.iterator)
except StopIteration:
# We've reached the end. Discard the iterator (in theory an iterator that
# has raised StopIteration once will keep raising it every time thereafter,
# but best not to rely on that).
self.iterator = None
# And we're done.
return
self.items.append(item)
yield self.items[i]

@ -17,6 +17,7 @@ from tempfile import TemporaryFile
import gevent import gevent
from gevent import subprocess from gevent import subprocess
from .cached_iterator import CachedIterator
from .stats import timed from .stats import timed
@ -223,6 +224,14 @@ def hour_paths_for_range(hours_path, start, end):
current += datetime.timedelta(hours=1) current += datetime.timedelta(hours=1)
# Maps hour path to (directory contents, cached result).
# If the directory contents are identical, then we can use the cached result for that hour
# instead of re-calculating. If they have changed, we throw out the cached result.
# Since best_segments_by_start returns an iterator that may not be entirely consumed,
# our cached result stores both all results returned so far, and the live iterator
# in case we need to continue consuming.
_best_segments_by_start_cache = {}
def best_segments_by_start(hour): def best_segments_by_start(hour):
"""Within a given hour path, yield the "best" segment per unique segment start time. """Within a given hour path, yield the "best" segment per unique segment start time.
Best is defined as type=full, or failing that type=suspect, or failing that the longest type=partial. Best is defined as type=full, or failing that type=suspect, or failing that the longest type=partial.
@ -234,8 +243,22 @@ def best_segments_by_start(hour):
if e.errno != errno.ENOENT: if e.errno != errno.ENOENT:
raise raise
# path does not exist, treat it as having no files # path does not exist, treat it as having no files
return segment_paths = []
segment_paths.sort() segment_paths.sort()
# if result is in the cache and the segment_paths haven't changed, return cached result
if hour in _best_segments_by_start_cache:
prev_segment_paths, cached_result = _best_segments_by_start_cache[hour]
if prev_segment_paths == segment_paths:
return cached_result
# otherwise create new result and cache it
result = CachedIterator(_best_segments_by_start(hour, segment_paths))
_best_segments_by_start_cache[hour] = segment_paths, result
return result
def _best_segments_by_start(hour, segment_paths):
# raise a warning for any files that don't parse as segments and ignore them # raise a warning for any files that don't parse as segments and ignore them
parsed = [] parsed = []
for name in segment_paths: for name in segment_paths:

Loading…
Cancel
Save