diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index d1113e5..416dff1 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -21,6 +21,7 @@ import common from common import dateutil from common import database from common.requests import InstrumentedSession +from common.segments import list_segment_files # Wraps all requests in some metric collection requests = InstrumentedSession() @@ -83,8 +84,8 @@ def list_local_hours(base_dir, channel, quality): return [] -def list_local_segments(base_dir, channel, quality, hour): - """List segments in a given hour directory. +def list_local_segments(base_dir, channel, quality, hour, include_tombstones=False): + """List segment files + optionally tombstone files in a given hour directory. For a given base_dir/channel/quality/hour directory return a list of non-hidden files. If the directory path is not found, return an empty list. @@ -93,13 +94,7 @@ def list_local_segments(base_dir, channel, quality, hour): restreamer.list_segments but this avoids HTTP/JSON overheads.""" path = os.path.join(base_dir, channel, quality, hour) - try: - return [name for name in os.listdir(path) if not name.startswith('.')] - - except OSError as e: - if e.errno != errno.ENOENT: - raise - return [] + return list_segment_files(path, include_tombstones=include_tombstones) def list_remote_hours(node, channel, quality, timeout=TIMEOUT): @@ -111,10 +106,12 @@ def list_remote_hours(node, channel, quality, timeout=TIMEOUT): def list_remote_segments(node, channel, quality, hour, timeout=TIMEOUT): - """Wrapper around a call to restreamer.list_segments.""" + """Wrapper around a call to restreamer.list_segments. + Lists segments as well as tombstone files. + """ uri = '{}/files/{}/{}/{}'.format(node, channel, quality, hour) logging.debug('Getting list of segments from {}'.format(uri)) - resp = requests.get(uri, timeout=timeout, metric_name='list_remote_segments') + resp = requests.get(uri, params={"tombstones": "true"}, timeout=timeout, metric_name='list_remote_segments') return resp.json() @@ -272,6 +269,9 @@ class BackfillerManager(object): gevent.idle() path = os.path.join(self.base_dir, channel, quality, hour) self.logger.info('Deleting {}'.format(path)) + # note we make no attempt to delete the tombstones. + # this is to avoid a race condition where we delete the tombstone and let the original + # file get served before it also gets deleted. segments = list_local_segments(self.base_dir, channel, quality, hour) for segment in segments: try: @@ -289,6 +289,7 @@ class BackfillerManager(object): if e.errno == errno.ENOENT: self.logger.warn('{} already deleted'.format(path)) # warn if not empty (will try to delete folder again next time) + # this is likely to happen if there are hidden temp files or tombstones left over. elif e.errno == errno.ENOTEMPTY: self.logger.warn('Failed to delete non-empty folder {}'.format(path)) else: @@ -447,8 +448,12 @@ class BackfillerWorker(object): break self.logger.info('Backfilling {}/{}'.format(quality, hour)) - - local_segments = set(list_local_segments(self.base_dir, channel, quality, hour)) + + # Note that both local and remote listings include tombstone files, + # but not the associated segment files for those tombstones. + # This way tombstones will be propagated by the backfiller but the segments + # they mark will not. + local_segments = set(list_local_segments(self.base_dir, channel, quality, hour, include_tombstones=True)) remote_segments = set(list_remote_segments(self.node, channel, quality, hour)) missing_segments = list(remote_segments - local_segments) @@ -465,7 +470,15 @@ class BackfillerWorker(object): return path = os.path.join(channel, quality, hour, missing_segment) - + + # tombstone files are empty markers, there's no need to download them. + # we just create a new empty file locally. + if path.endswith('.tombstone'): + # create empty file by opening in 'w' mode then immediately closing it + with open(os.path.join(self.base_dir, path), 'w'): + pass + continue + # test to see if file is a segment and get the segments start time try: segment = common.parse_segment_path(path) diff --git a/common/common/segments.py b/common/common/segments.py index 3a2fdff..557afb1 100644 --- a/common/common/segments.py +++ b/common/common/segments.py @@ -224,6 +224,44 @@ def hour_paths_for_range(hours_path, start, end): current += datetime.timedelta(hours=1) +def list_segment_files(hour_path, include_tombstones=False): + """Return a list of filenames of segments in the given hour path. + Segment names are not parsed or verified, but only non-hidden .ts files + without an associated tombstone file will be listed. + If include_tombstones = true, the tombstone files themselves will also be listed. + """ + try: + names = os.listdir(hour_path) + except OSError as e: + if e.errno != errno.ENOENT: + raise + # path does not exist, treat it as having no files + return [] + + # Split into name and extension, this makes the later processing easier. + # Note that ext will include the leading dot, ie. "foo.bar" -> ("foo", ".bar"). + # Files with no extension produce empty string, ie. "foo" -> ("foo", "") + # and files with leading dots treat them as part of the name, ie. ".foo" -> (".foo", ""). + splits = [os.path.splitext(name) for name in names] + + # Look for any tombstone files, which indicate we should treat the segment file of the same + # name as though it doesn't exist. + tombstones = [name for name, ext in splits if ext == '.tombstone'] + + # Return non-hidden ts files, except those that match a tombstone. + segments = [ + name + ext for name, ext in splits + if name not in tombstones + and ext == ".ts" + and not name.startswith('.') + ] + + if include_tombstones: + return segments + ["{}.tombstone".format(name) for name in tombstones] + else: + return segments + + # Maps hour path to (directory contents, cached result). # If the directory contents are identical, then we can use the cached result for that hour # instead of re-calculating. If they have changed, we throw out the cached result. @@ -237,13 +275,7 @@ def best_segments_by_start(hour): Best is defined as type=full, or failing that type=suspect, or failing that the longest type=partial. Note this means this function may perform os.stat()s. """ - try: - segment_paths = os.listdir(hour) - except OSError as e: - if e.errno != errno.ENOENT: - raise - # path does not exist, treat it as having no files - segment_paths = [] + segment_paths = list_segment_files(hour) segment_paths.sort() # if result is in the cache and the segment_paths haven't changed, return cached result diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index f11b34b..c987392 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -528,7 +528,13 @@ class SegmentGetter(object): raise return False full_prefix = "{}-full".format(self.prefix) - return any(candidate.startswith(full_prefix) for candidate in candidates) + return any( + candidate.startswith(full_prefix) + # There's almost no way a matching tombstone could already exist, but just in case + # we'll make sure it isn't counted. + and not candidate.endswith(".tombstone") + for candidate in candidates + ) def get_segment(self): try: diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index daa593d..a481f69 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -16,7 +16,7 @@ from gevent.pywsgi import WSGIServer from common import dateutil, get_best_segments, rough_cut_segments, fast_cut_segments, full_cut_segments, PromLogCountsHandler, install_stacksampler, serve_with_graceful_shutdown from common.flask_stats import request_stats, after_request -from common.segments import feed_input, render_segments_waveform, extract_frame +from common.segments import feed_input, render_segments_waveform, extract_frame, list_segment_files from . import generate_hls @@ -138,6 +138,7 @@ def list_hours(channel, quality): def list_segments(channel, quality, hour): """Returns a JSON list of segment files for a given channel, quality and hour. Returns empty list on non-existant channels, etc. + If tombstones = "true", will also list tombstone files for that hour. """ path = os.path.join( app.static_folder, @@ -145,7 +146,11 @@ def list_segments(channel, quality, hour): quality, hour, ) - return json.dumps(listdir(path, error=False)) + tombstones = request.args.get('tombstones', 'false').lower() + if tombstones not in ["true", "false"]: + return "tombstones must be one of: true, false", 400 + tombstones = (tombstones == "true") + return json.dumps(list_segment_files(path, include_tombstones=tombstones)) def time_range_for_quality(channel, quality): diff --git a/segment_coverage/segment_coverage/main.py b/segment_coverage/segment_coverage/main.py index ffbbb1b..5cae8f4 100644 --- a/segment_coverage/segment_coverage/main.py +++ b/segment_coverage/segment_coverage/main.py @@ -16,6 +16,7 @@ import prometheus_client as prom import common from common import dateutil from common import database +from common.segments import list_segment_files segment_count_gauge = prom.Gauge( @@ -272,12 +273,7 @@ class CoverageChecker(object): # based on common.segments.best_segments_by_start # but more complicated to capture more detailed metrics hour_path = os.path.join(self.base_dir, self.channel, quality, hour) - try: - segment_names = [name for name in os.listdir(hour_path) if not name.startswith('.')] - except OSError as e: - if e.errno == errno.ENOENT: - self.logger.warning('Hour {} was deleted between finding it and processing it, ignoring'.format(hour)) - continue + segment_names = list_segment_files(hour_path) segment_names.sort() parsed = [] bad_segment_count = 0