From 1add3c5c22508f3c72ac38d9cca4361d65e68b12 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Sun, 8 May 2022 19:35:30 +1000 Subject: [PATCH] Implement tombstoning to allow for segment deletion Rarely, we find ourselves needing to explicitly delete some data, eg. something that shouldn't have been public and should be removed from all records. It would also be nice if we could "clean up" bad versions of the same segment, which occasionally come up when downloaders have issues. With our distributed segment database, this is actually rather difficult as deleting the data from any one server would cause it to be restored from the others. It was only possible by stopping all backfill, deleting the data on all servers, then starting backfill again. Here we introduce a more practical approach. An operator creates an empty flag file with the same name as the segment to be deleted, but with a `.tombstone` extension. eg. to delete a file `/segments/desertbus/source/2019-11-13T02/45:51.608000-2.0-full-7IS92rssMzoSBQDIevHStbTNy-URRV3Vw-jzZ6pwOZM.ts`, you would create a tombstone `/segments/desertbus/source/2019-11-13T02/45:51.608000-2.0-full-7IS92rssMzoSBQDIevHStbTNy-URRV3Vw-jzZ6pwOZM.tombstone`. These tombstone files do two important things: * They hide the segment from being listed, which both means: * It can't be restreamed or put into a video * It can't be backfilled to other nodes * The tombstone files themselves do get backfilled to other nodes, so you only need to mark them on one server. Once the tombstone has propagated to all nodes, the segment file can be deleted independently on each one. We chose not to have a tombstone automatically trigger a segment deletion for safety reasons. --- backfiller/backfiller/main.py | 41 +++++++++++++------- common/common/segments.py | 46 +++++++++++++++++++---- downloader/downloader/main.py | 8 +++- restreamer/restreamer/main.py | 9 ++++- segment_coverage/segment_coverage/main.py | 8 +--- 5 files changed, 82 insertions(+), 30 deletions(-) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index d1113e5..416dff1 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -21,6 +21,7 @@ import common from common import dateutil from common import database from common.requests import InstrumentedSession +from common.segments import list_segment_files # Wraps all requests in some metric collection requests = InstrumentedSession() @@ -83,8 +84,8 @@ def list_local_hours(base_dir, channel, quality): return [] -def list_local_segments(base_dir, channel, quality, hour): - """List segments in a given hour directory. +def list_local_segments(base_dir, channel, quality, hour, include_tombstones=False): + """List segment files + optionally tombstone files in a given hour directory. For a given base_dir/channel/quality/hour directory return a list of non-hidden files. If the directory path is not found, return an empty list. @@ -93,13 +94,7 @@ def list_local_segments(base_dir, channel, quality, hour): restreamer.list_segments but this avoids HTTP/JSON overheads.""" path = os.path.join(base_dir, channel, quality, hour) - try: - return [name for name in os.listdir(path) if not name.startswith('.')] - - except OSError as e: - if e.errno != errno.ENOENT: - raise - return [] + return list_segment_files(path, include_tombstones=include_tombstones) def list_remote_hours(node, channel, quality, timeout=TIMEOUT): @@ -111,10 +106,12 @@ def list_remote_hours(node, channel, quality, timeout=TIMEOUT): def list_remote_segments(node, channel, quality, hour, timeout=TIMEOUT): - """Wrapper around a call to restreamer.list_segments.""" + """Wrapper around a call to restreamer.list_segments. + Lists segments as well as tombstone files. + """ uri = '{}/files/{}/{}/{}'.format(node, channel, quality, hour) logging.debug('Getting list of segments from {}'.format(uri)) - resp = requests.get(uri, timeout=timeout, metric_name='list_remote_segments') + resp = requests.get(uri, params={"tombstones": "true"}, timeout=timeout, metric_name='list_remote_segments') return resp.json() @@ -272,6 +269,9 @@ class BackfillerManager(object): gevent.idle() path = os.path.join(self.base_dir, channel, quality, hour) self.logger.info('Deleting {}'.format(path)) + # note we make no attempt to delete the tombstones. + # this is to avoid a race condition where we delete the tombstone and let the original + # file get served before it also gets deleted. segments = list_local_segments(self.base_dir, channel, quality, hour) for segment in segments: try: @@ -289,6 +289,7 @@ class BackfillerManager(object): if e.errno == errno.ENOENT: self.logger.warn('{} already deleted'.format(path)) # warn if not empty (will try to delete folder again next time) + # this is likely to happen if there are hidden temp files or tombstones left over. elif e.errno == errno.ENOTEMPTY: self.logger.warn('Failed to delete non-empty folder {}'.format(path)) else: @@ -447,8 +448,12 @@ class BackfillerWorker(object): break self.logger.info('Backfilling {}/{}'.format(quality, hour)) - - local_segments = set(list_local_segments(self.base_dir, channel, quality, hour)) + + # Note that both local and remote listings include tombstone files, + # but not the associated segment files for those tombstones. + # This way tombstones will be propagated by the backfiller but the segments + # they mark will not. + local_segments = set(list_local_segments(self.base_dir, channel, quality, hour, include_tombstones=True)) remote_segments = set(list_remote_segments(self.node, channel, quality, hour)) missing_segments = list(remote_segments - local_segments) @@ -465,7 +470,15 @@ class BackfillerWorker(object): return path = os.path.join(channel, quality, hour, missing_segment) - + + # tombstone files are empty markers, there's no need to download them. + # we just create a new empty file locally. + if path.endswith('.tombstone'): + # create empty file by opening in 'w' mode then immediately closing it + with open(os.path.join(self.base_dir, path), 'w'): + pass + continue + # test to see if file is a segment and get the segments start time try: segment = common.parse_segment_path(path) diff --git a/common/common/segments.py b/common/common/segments.py index 3a2fdff..557afb1 100644 --- a/common/common/segments.py +++ b/common/common/segments.py @@ -224,6 +224,44 @@ def hour_paths_for_range(hours_path, start, end): current += datetime.timedelta(hours=1) +def list_segment_files(hour_path, include_tombstones=False): + """Return a list of filenames of segments in the given hour path. + Segment names are not parsed or verified, but only non-hidden .ts files + without an associated tombstone file will be listed. + If include_tombstones = true, the tombstone files themselves will also be listed. + """ + try: + names = os.listdir(hour_path) + except OSError as e: + if e.errno != errno.ENOENT: + raise + # path does not exist, treat it as having no files + return [] + + # Split into name and extension, this makes the later processing easier. + # Note that ext will include the leading dot, ie. "foo.bar" -> ("foo", ".bar"). + # Files with no extension produce empty string, ie. "foo" -> ("foo", "") + # and files with leading dots treat them as part of the name, ie. ".foo" -> (".foo", ""). + splits = [os.path.splitext(name) for name in names] + + # Look for any tombstone files, which indicate we should treat the segment file of the same + # name as though it doesn't exist. + tombstones = [name for name, ext in splits if ext == '.tombstone'] + + # Return non-hidden ts files, except those that match a tombstone. + segments = [ + name + ext for name, ext in splits + if name not in tombstones + and ext == ".ts" + and not name.startswith('.') + ] + + if include_tombstones: + return segments + ["{}.tombstone".format(name) for name in tombstones] + else: + return segments + + # Maps hour path to (directory contents, cached result). # If the directory contents are identical, then we can use the cached result for that hour # instead of re-calculating. If they have changed, we throw out the cached result. @@ -237,13 +275,7 @@ def best_segments_by_start(hour): Best is defined as type=full, or failing that type=suspect, or failing that the longest type=partial. Note this means this function may perform os.stat()s. """ - try: - segment_paths = os.listdir(hour) - except OSError as e: - if e.errno != errno.ENOENT: - raise - # path does not exist, treat it as having no files - segment_paths = [] + segment_paths = list_segment_files(hour) segment_paths.sort() # if result is in the cache and the segment_paths haven't changed, return cached result diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index f11b34b..c987392 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -528,7 +528,13 @@ class SegmentGetter(object): raise return False full_prefix = "{}-full".format(self.prefix) - return any(candidate.startswith(full_prefix) for candidate in candidates) + return any( + candidate.startswith(full_prefix) + # There's almost no way a matching tombstone could already exist, but just in case + # we'll make sure it isn't counted. + and not candidate.endswith(".tombstone") + for candidate in candidates + ) def get_segment(self): try: diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index daa593d..a481f69 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -16,7 +16,7 @@ from gevent.pywsgi import WSGIServer from common import dateutil, get_best_segments, rough_cut_segments, fast_cut_segments, full_cut_segments, PromLogCountsHandler, install_stacksampler, serve_with_graceful_shutdown from common.flask_stats import request_stats, after_request -from common.segments import feed_input, render_segments_waveform, extract_frame +from common.segments import feed_input, render_segments_waveform, extract_frame, list_segment_files from . import generate_hls @@ -138,6 +138,7 @@ def list_hours(channel, quality): def list_segments(channel, quality, hour): """Returns a JSON list of segment files for a given channel, quality and hour. Returns empty list on non-existant channels, etc. + If tombstones = "true", will also list tombstone files for that hour. """ path = os.path.join( app.static_folder, @@ -145,7 +146,11 @@ def list_segments(channel, quality, hour): quality, hour, ) - return json.dumps(listdir(path, error=False)) + tombstones = request.args.get('tombstones', 'false').lower() + if tombstones not in ["true", "false"]: + return "tombstones must be one of: true, false", 400 + tombstones = (tombstones == "true") + return json.dumps(list_segment_files(path, include_tombstones=tombstones)) def time_range_for_quality(channel, quality): diff --git a/segment_coverage/segment_coverage/main.py b/segment_coverage/segment_coverage/main.py index ffbbb1b..5cae8f4 100644 --- a/segment_coverage/segment_coverage/main.py +++ b/segment_coverage/segment_coverage/main.py @@ -16,6 +16,7 @@ import prometheus_client as prom import common from common import dateutil from common import database +from common.segments import list_segment_files segment_count_gauge = prom.Gauge( @@ -272,12 +273,7 @@ class CoverageChecker(object): # based on common.segments.best_segments_by_start # but more complicated to capture more detailed metrics hour_path = os.path.join(self.base_dir, self.channel, quality, hour) - try: - segment_names = [name for name in os.listdir(hour_path) if not name.startswith('.')] - except OSError as e: - if e.errno == errno.ENOENT: - self.logger.warning('Hour {} was deleted between finding it and processing it, ignoring'.format(hour)) - continue + segment_names = list_segment_files(hour_path) segment_names.sort() parsed = [] bad_segment_count = 0