Implement tombstoning to allow for segment deletion

Rarely, we find ourselves needing to explicitly delete some data, eg. something that shouldn't
have been public and should be removed from all records.

It would also be nice if we could "clean up" bad versions of the same segment,
which occasionally come up when downloaders have issues.

With our distributed segment database, this is actually rather difficult as deleting the data
from any one server would cause it to be restored from the others. It was only possible
by stopping all backfill, deleting the data on all servers, then starting backfill again.

Here we introduce a more practical approach. An operator creates an empty flag file
with the same name as the segment to be deleted, but with a `.tombstone` extension.
eg. to delete a file `/segments/desertbus/source/2019-11-13T02/45:51.608000-2.0-full-7IS92rssMzoSBQDIevHStbTNy-URRV3Vw-jzZ6pwOZM.ts`,
you would create a tombstone `/segments/desertbus/source/2019-11-13T02/45:51.608000-2.0-full-7IS92rssMzoSBQDIevHStbTNy-URRV3Vw-jzZ6pwOZM.tombstone`.

These tombstone files do two important things:
* They hide the segment from being listed, which both means:
  * It can't be restreamed or put into a video
  * It can't be backfilled to other nodes
* The tombstone files themselves do get backfilled to other nodes, so you only need to mark them on one server.

Once the tombstone has propagated to all nodes, the segment file can be deleted independently on each one.

We chose not to have a tombstone automatically trigger a segment deletion for safety reasons.
pull/295/head
Mike Lang 3 years ago committed by Mike Lang
parent 44d0c0269a
commit 1add3c5c22

@ -21,6 +21,7 @@ import common
from common import dateutil
from common import database
from common.requests import InstrumentedSession
from common.segments import list_segment_files
# Wraps all requests in some metric collection
requests = InstrumentedSession()
@ -83,8 +84,8 @@ def list_local_hours(base_dir, channel, quality):
return []
def list_local_segments(base_dir, channel, quality, hour):
"""List segments in a given hour directory.
def list_local_segments(base_dir, channel, quality, hour, include_tombstones=False):
"""List segment files + optionally tombstone files in a given hour directory.
For a given base_dir/channel/quality/hour directory return a list of
non-hidden files. If the directory path is not found, return an empty list.
@ -93,13 +94,7 @@ def list_local_segments(base_dir, channel, quality, hour):
restreamer.list_segments but this avoids HTTP/JSON overheads."""
path = os.path.join(base_dir, channel, quality, hour)
try:
return [name for name in os.listdir(path) if not name.startswith('.')]
except OSError as e:
if e.errno != errno.ENOENT:
raise
return []
return list_segment_files(path, include_tombstones=include_tombstones)
def list_remote_hours(node, channel, quality, timeout=TIMEOUT):
@ -111,10 +106,12 @@ def list_remote_hours(node, channel, quality, timeout=TIMEOUT):
def list_remote_segments(node, channel, quality, hour, timeout=TIMEOUT):
"""Wrapper around a call to restreamer.list_segments."""
"""Wrapper around a call to restreamer.list_segments.
Lists segments as well as tombstone files.
"""
uri = '{}/files/{}/{}/{}'.format(node, channel, quality, hour)
logging.debug('Getting list of segments from {}'.format(uri))
resp = requests.get(uri, timeout=timeout, metric_name='list_remote_segments')
resp = requests.get(uri, params={"tombstones": "true"}, timeout=timeout, metric_name='list_remote_segments')
return resp.json()
@ -272,6 +269,9 @@ class BackfillerManager(object):
gevent.idle()
path = os.path.join(self.base_dir, channel, quality, hour)
self.logger.info('Deleting {}'.format(path))
# note we make no attempt to delete the tombstones.
# this is to avoid a race condition where we delete the tombstone and let the original
# file get served before it also gets deleted.
segments = list_local_segments(self.base_dir, channel, quality, hour)
for segment in segments:
try:
@ -289,6 +289,7 @@ class BackfillerManager(object):
if e.errno == errno.ENOENT:
self.logger.warn('{} already deleted'.format(path))
# warn if not empty (will try to delete folder again next time)
# this is likely to happen if there are hidden temp files or tombstones left over.
elif e.errno == errno.ENOTEMPTY:
self.logger.warn('Failed to delete non-empty folder {}'.format(path))
else:
@ -448,7 +449,11 @@ class BackfillerWorker(object):
self.logger.info('Backfilling {}/{}'.format(quality, hour))
local_segments = set(list_local_segments(self.base_dir, channel, quality, hour))
# Note that both local and remote listings include tombstone files,
# but not the associated segment files for those tombstones.
# This way tombstones will be propagated by the backfiller but the segments
# they mark will not.
local_segments = set(list_local_segments(self.base_dir, channel, quality, hour, include_tombstones=True))
remote_segments = set(list_remote_segments(self.node, channel, quality, hour))
missing_segments = list(remote_segments - local_segments)
@ -466,6 +471,14 @@ class BackfillerWorker(object):
path = os.path.join(channel, quality, hour, missing_segment)
# tombstone files are empty markers, there's no need to download them.
# we just create a new empty file locally.
if path.endswith('.tombstone'):
# create empty file by opening in 'w' mode then immediately closing it
with open(os.path.join(self.base_dir, path), 'w'):
pass
continue
# test to see if file is a segment and get the segments start time
try:
segment = common.parse_segment_path(path)

@ -224,6 +224,44 @@ def hour_paths_for_range(hours_path, start, end):
current += datetime.timedelta(hours=1)
def list_segment_files(hour_path, include_tombstones=False):
"""Return a list of filenames of segments in the given hour path.
Segment names are not parsed or verified, but only non-hidden .ts files
without an associated tombstone file will be listed.
If include_tombstones = true, the tombstone files themselves will also be listed.
"""
try:
names = os.listdir(hour_path)
except OSError as e:
if e.errno != errno.ENOENT:
raise
# path does not exist, treat it as having no files
return []
# Split into name and extension, this makes the later processing easier.
# Note that ext will include the leading dot, ie. "foo.bar" -> ("foo", ".bar").
# Files with no extension produce empty string, ie. "foo" -> ("foo", "")
# and files with leading dots treat them as part of the name, ie. ".foo" -> (".foo", "").
splits = [os.path.splitext(name) for name in names]
# Look for any tombstone files, which indicate we should treat the segment file of the same
# name as though it doesn't exist.
tombstones = [name for name, ext in splits if ext == '.tombstone']
# Return non-hidden ts files, except those that match a tombstone.
segments = [
name + ext for name, ext in splits
if name not in tombstones
and ext == ".ts"
and not name.startswith('.')
]
if include_tombstones:
return segments + ["{}.tombstone".format(name) for name in tombstones]
else:
return segments
# Maps hour path to (directory contents, cached result).
# If the directory contents are identical, then we can use the cached result for that hour
# instead of re-calculating. If they have changed, we throw out the cached result.
@ -237,13 +275,7 @@ def best_segments_by_start(hour):
Best is defined as type=full, or failing that type=suspect, or failing that the longest type=partial.
Note this means this function may perform os.stat()s.
"""
try:
segment_paths = os.listdir(hour)
except OSError as e:
if e.errno != errno.ENOENT:
raise
# path does not exist, treat it as having no files
segment_paths = []
segment_paths = list_segment_files(hour)
segment_paths.sort()
# if result is in the cache and the segment_paths haven't changed, return cached result

@ -528,7 +528,13 @@ class SegmentGetter(object):
raise
return False
full_prefix = "{}-full".format(self.prefix)
return any(candidate.startswith(full_prefix) for candidate in candidates)
return any(
candidate.startswith(full_prefix)
# There's almost no way a matching tombstone could already exist, but just in case
# we'll make sure it isn't counted.
and not candidate.endswith(".tombstone")
for candidate in candidates
)
def get_segment(self):
try:

@ -16,7 +16,7 @@ from gevent.pywsgi import WSGIServer
from common import dateutil, get_best_segments, rough_cut_segments, fast_cut_segments, full_cut_segments, PromLogCountsHandler, install_stacksampler, serve_with_graceful_shutdown
from common.flask_stats import request_stats, after_request
from common.segments import feed_input, render_segments_waveform, extract_frame
from common.segments import feed_input, render_segments_waveform, extract_frame, list_segment_files
from . import generate_hls
@ -138,6 +138,7 @@ def list_hours(channel, quality):
def list_segments(channel, quality, hour):
"""Returns a JSON list of segment files for a given channel, quality and
hour. Returns empty list on non-existant channels, etc.
If tombstones = "true", will also list tombstone files for that hour.
"""
path = os.path.join(
app.static_folder,
@ -145,7 +146,11 @@ def list_segments(channel, quality, hour):
quality,
hour,
)
return json.dumps(listdir(path, error=False))
tombstones = request.args.get('tombstones', 'false').lower()
if tombstones not in ["true", "false"]:
return "tombstones must be one of: true, false", 400
tombstones = (tombstones == "true")
return json.dumps(list_segment_files(path, include_tombstones=tombstones))
def time_range_for_quality(channel, quality):

@ -16,6 +16,7 @@ import prometheus_client as prom
import common
from common import dateutil
from common import database
from common.segments import list_segment_files
segment_count_gauge = prom.Gauge(
@ -272,12 +273,7 @@ class CoverageChecker(object):
# based on common.segments.best_segments_by_start
# but more complicated to capture more detailed metrics
hour_path = os.path.join(self.base_dir, self.channel, quality, hour)
try:
segment_names = [name for name in os.listdir(hour_path) if not name.startswith('.')]
except OSError as e:
if e.errno == errno.ENOENT:
self.logger.warning('Hour {} was deleted between finding it and processing it, ignoring'.format(hour))
continue
segment_names = list_segment_files(hour_path)
segment_names.sort()
parsed = []
bad_segment_count = 0

Loading…
Cancel
Save