From cb929c12b324fae9bae4ec9360ff3699dd1f2f5d Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 7 Jan 2019 00:15:25 -0800 Subject: [PATCH] common: Split segment-related stuff into its own module We still import them into __init__.py so they're accessible externally just the same --- common/common/__init__.py | 206 +----------------------------------- common/common/segments.py | 214 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 215 insertions(+), 205 deletions(-) create mode 100644 common/common/segments.py diff --git a/common/common/__init__.py b/common/common/__init__.py index 90be4d9..15978c2 100644 --- a/common/common/__init__.py +++ b/common/common/__init__.py @@ -2,18 +2,12 @@ """A place for common utilities between wubloader components""" -import base64 import datetime import errno -import itertools -import logging import os import random -import sys -from collections import namedtuple - -import dateutil.parser +from .segments import get_best_segments, parse_segment_path, SegmentInfo from .stats import timed, PromLogCountsHandler @@ -64,204 +58,6 @@ def format_bustime(bustime, round="millisecond"): return sign + ":".join(parts) -def unpadded_b64_decode(s): - """Decode base64-encoded string that has had its padding removed""" - # right-pad with '=' to multiple of 4 - s = s + '=' * (- len(s) % 4) - return base64.b64decode(s, "-_") - - -class SegmentInfo( - namedtuple('SegmentInfoBase', [ - 'path', 'stream', 'variant', 'start', 'duration', 'is_partial', 'hash' - ]) -): - """Info parsed from a segment path, including original path. - Note that start time is a datetime and duration is a timedelta, and hash is a decoded binary string.""" - @property - def end(self): - return self.start + self.duration - - -def parse_segment_path(path): - """Parse segment path, returning a SegmentInfo. If path is only the trailing part, - eg. just a filename, it will leave unknown fields as None.""" - parts = path.split('/') - # left-pad parts with None up to 4 parts - parts = [None] * (4 - len(parts)) + parts - # pull info out of path parts - stream, variant, hour, filename = parts[-4:] - # split filename, which should be TIME-DURATION-TYPE-HASH.ts - try: - if not filename.endswith('.ts'): - raise ValueError("Does not end in .ts") - filename = filename[:-len('.ts')] # chop off .ts - parts = filename.split('-', 3) - if len(parts) != 4: - raise ValueError("Not enough dashes in filename") - time, duration, type, hash = parts - if type not in ('full', 'partial'): - raise ValueError("Unknown type {!r}".format(type)) - return SegmentInfo( - path = path, - stream = stream, - variant = variant, - start = dateutil.parser.parse("{}:{}".format(hour, time)), - duration = datetime.timedelta(seconds=float(duration)), - is_partial = type == "partial", - hash = unpadded_b64_decode(hash), - ) - except ValueError as e: - # wrap error but preserve original traceback - _, _, tb = sys.exc_info() - raise ValueError, ValueError("Bad path {!r}: {}".format(path, e)), tb - - -@timed( - hours_path=lambda ret, hours_path, start, end: hours_path, - has_holes=lambda ret, hours_path, start, end: None in ret, - normalize=lambda ret, hours_path, start, end: len([x for x in ret if x is not None]), -) -def get_best_segments(hours_path, start, end): - """Return a list of the best sequence of non-overlapping segments - we have for a given time range. Hours path should be the directory containing hour directories. - Time args start and end should be given as datetime objects. - The first segment may start before the time range, and the last may end after it. - The returned list contains items that are either: - SegmentInfo: a segment - None: represents a discontinuity between the previous segment and the next one. - ie. as long as two segments appear next to each other, we guarentee there is no gap between - them, the second one starts right as the first one finishes. - Similarly, unless the first item is None, the first segment starts <= the start of the time - range, and unless the last item is None, the last segment ends >= the end of the time range. - Example: - Suppose you ask for a time range from 10 to 60. We have 10-second segments covering - the following times: - 5 to 15 - 15 to 25 - 30 to 40 - 40 to 50 - Then the output would look like: - segment from 5 to 15 - segment from 15 to 25 - None, as the previous segment ends 5sec before the next one begins - segment from 30 to 40 - segment from 40 to 50 - None, as the previous segment ends 10sec before the requested end time of 60. - Note that any is_partial=True segment will be followed by a None, since we can't guarentee - it joins on to the next segment fully intact. - """ - # Note: The exact equality checks in this function are not vulnerable to floating point error, - # but only because all input dates and durations are only precise to the millisecond, and - # python's datetime types represent these as integer microseconds internally. So the parsing - # to these types is exact, and all operations on them are exact, so all operations are exact. - - result = [] - - for hour in hour_paths_for_range(hours_path, start, end): - # best_segments_by_start will give us the best available segment for each unique start time - for segment in best_segments_by_start(hour): - - # special case: first segment - if not result: - # first segment is allowed to be before start as long as it includes it - if segment.start <= start < segment.end: - # segment covers start - result.append(segment) - elif start < segment.start < end: - # segment is after start (but before end), so there was no segment that covers start - # so we begin with a None - result.append(None) - result.append(segment) - else: - # segment is before start, and doesn't cover start, or starts after end. - # ignore and go to next. - continue - else: - # normal case: check against previous segment end time - prev_end = result[-1].end - if segment.start < prev_end: - # Overlap! This shouldn't happen, though it might be possible due to weirdness - # if the stream drops then starts again quickly. We simply ignore the overlapping - # segment and let the algorithm continue. - logging.warning("Overlapping segments: {} overlaps end of {}".format(segment, result[-1])) - continue - if result[-1].is_partial or prev_end < segment.start: - # there's a gap between prev end and this start, so add a None - result.append(None) - result.append(segment) - - # check if we've reached the end - if end <= segment.end: - break - - # this is a weird little construct that says "if we broke from the inner loop, - # then also break from the outer one. otherwise continue." - else: - continue - break - - # check if we need a trailing None because last segment is partial or doesn't reach end - if result and (result[-1].is_partial or result[-1].end < end): - result.append(None) - - return result - - -def hour_paths_for_range(hours_path, start, end): - """Generate a list of hour paths to check when looking for segments between start and end.""" - # truncate start and end to the hour - def truncate(dt): - return dt.replace(microsecond=0, second=0, minute=0) - current = truncate(start) - end = truncate(end) - # Begin in the hour prior to start, as there may be a segment that starts in that hour - # but contains the start time, eg. if the start time is 01:00:01 and there's a segment - # at 00:59:59 which goes for 3 seconds. - # Checking the entire hour when in most cases it won't be needed is wasteful, but it's also - # pretty quick and the complexity of only checking this case when needed just isn't worth it. - current -= datetime.timedelta(hours=1) - while current <= end: - yield os.path.join(hours_path, current.strftime("%Y-%m-%dT%H")) - current += datetime.timedelta(hours=1) - - -def best_segments_by_start(hour): - """Within a given hour path, yield the "best" segment per unique segment start time. - Best is defined as non-partial, or failing that the longest partial. - Note this means this function may perform os.stat()s in order to find the longest partial. - """ - try: - segment_paths = os.listdir(hour) - except OSError as e: - if e.errno != errno.ENOENT: - raise - # path does not exist, treat it as having no files - return - segment_paths.sort() - # note we only parse them as we need them, which is unlikely to save us much time overall - # but is easy enough to do, so we might as well. - parsed = (parse_segment_path(os.path.join(hour, name)) for name in segment_paths) - for start_time, segments in itertools.groupby(parsed, key=lambda segment: segment.start): - segments = list(segments) - full_segments = [segment for segment in segments if not segment.is_partial] - if full_segments: - if len(full_segments) != 1: - logging.warning("Multiple versions of full segment at start_time {}: {}".format( - start_time, ", ".join(map(str, segments)) - )) - # We've observed some cases where the same segment (with the same hash) will be reported - # with different durations (generally at stream end). Prefer the longer duration, - # as this will ensure that if hashes are different we get the most data, and if they - # are the same it should keep holes to a minimum. - # If same duration, we have to pick one, so pick highest-sorting hash just so we're consistent. - full_segments = [max(full_segments, key=lambda segment: (segment.duration, segment.hash))] - yield full_segments[0] - continue - # no full segments, fall back to measuring partials. - yield max(segments, key=lambda segment: os.stat(segment.path).st_size) - - def rename(old, new): """Atomic rename that succeeds if the target already exists, since we're naming everything by hash anyway, so if the filepath already exists the file itself is already there. diff --git a/common/common/segments.py b/common/common/segments.py new file mode 100644 index 0000000..e03e20c --- /dev/null +++ b/common/common/segments.py @@ -0,0 +1,214 @@ + +"""A place for common utilities between wubloader components""" + + +import base64 +import datetime +import errno +import itertools +import logging +import os +import sys +from collections import namedtuple + +import dateutil.parser + +from .stats import timed + + +def unpadded_b64_decode(s): + """Decode base64-encoded string that has had its padding removed""" + # right-pad with '=' to multiple of 4 + s = s + '=' * (- len(s) % 4) + return base64.b64decode(s, "-_") + + +class SegmentInfo( + namedtuple('SegmentInfoBase', [ + 'path', 'stream', 'variant', 'start', 'duration', 'is_partial', 'hash' + ]) +): + """Info parsed from a segment path, including original path. + Note that start time is a datetime and duration is a timedelta, and hash is a decoded binary string.""" + @property + def end(self): + return self.start + self.duration + + +def parse_segment_path(path): + """Parse segment path, returning a SegmentInfo. If path is only the trailing part, + eg. just a filename, it will leave unknown fields as None.""" + parts = path.split('/') + # left-pad parts with None up to 4 parts + parts = [None] * (4 - len(parts)) + parts + # pull info out of path parts + stream, variant, hour, filename = parts[-4:] + # split filename, which should be TIME-DURATION-TYPE-HASH.ts + try: + if not filename.endswith('.ts'): + raise ValueError("Does not end in .ts") + filename = filename[:-len('.ts')] # chop off .ts + parts = filename.split('-', 3) + if len(parts) != 4: + raise ValueError("Not enough dashes in filename") + time, duration, type, hash = parts + if type not in ('full', 'partial'): + raise ValueError("Unknown type {!r}".format(type)) + return SegmentInfo( + path = path, + stream = stream, + variant = variant, + start = dateutil.parser.parse("{}:{}".format(hour, time)), + duration = datetime.timedelta(seconds=float(duration)), + is_partial = type == "partial", + hash = unpadded_b64_decode(hash), + ) + except ValueError as e: + # wrap error but preserve original traceback + _, _, tb = sys.exc_info() + raise ValueError, ValueError("Bad path {!r}: {}".format(path, e)), tb + + +@timed( + hours_path=lambda ret, hours_path, start, end: hours_path, + has_holes=lambda ret, hours_path, start, end: None in ret, + normalize=lambda ret, hours_path, start, end: len([x for x in ret if x is not None]), +) +def get_best_segments(hours_path, start, end): + """Return a list of the best sequence of non-overlapping segments + we have for a given time range. Hours path should be the directory containing hour directories. + Time args start and end should be given as datetime objects. + The first segment may start before the time range, and the last may end after it. + The returned list contains items that are either: + SegmentInfo: a segment + None: represents a discontinuity between the previous segment and the next one. + ie. as long as two segments appear next to each other, we guarentee there is no gap between + them, the second one starts right as the first one finishes. + Similarly, unless the first item is None, the first segment starts <= the start of the time + range, and unless the last item is None, the last segment ends >= the end of the time range. + Example: + Suppose you ask for a time range from 10 to 60. We have 10-second segments covering + the following times: + 5 to 15 + 15 to 25 + 30 to 40 + 40 to 50 + Then the output would look like: + segment from 5 to 15 + segment from 15 to 25 + None, as the previous segment ends 5sec before the next one begins + segment from 30 to 40 + segment from 40 to 50 + None, as the previous segment ends 10sec before the requested end time of 60. + Note that any is_partial=True segment will be followed by a None, since we can't guarentee + it joins on to the next segment fully intact. + """ + # Note: The exact equality checks in this function are not vulnerable to floating point error, + # but only because all input dates and durations are only precise to the millisecond, and + # python's datetime types represent these as integer microseconds internally. So the parsing + # to these types is exact, and all operations on them are exact, so all operations are exact. + + result = [] + + for hour in hour_paths_for_range(hours_path, start, end): + # best_segments_by_start will give us the best available segment for each unique start time + for segment in best_segments_by_start(hour): + + # special case: first segment + if not result: + # first segment is allowed to be before start as long as it includes it + if segment.start <= start < segment.end: + # segment covers start + result.append(segment) + elif start < segment.start < end: + # segment is after start (but before end), so there was no segment that covers start + # so we begin with a None + result.append(None) + result.append(segment) + else: + # segment is before start, and doesn't cover start, or starts after end. + # ignore and go to next. + continue + else: + # normal case: check against previous segment end time + prev_end = result[-1].end + if segment.start < prev_end: + # Overlap! This shouldn't happen, though it might be possible due to weirdness + # if the stream drops then starts again quickly. We simply ignore the overlapping + # segment and let the algorithm continue. + logging.warning("Overlapping segments: {} overlaps end of {}".format(segment, result[-1])) + continue + if result[-1].is_partial or prev_end < segment.start: + # there's a gap between prev end and this start, so add a None + result.append(None) + result.append(segment) + + # check if we've reached the end + if end <= segment.end: + break + + # this is a weird little construct that says "if we broke from the inner loop, + # then also break from the outer one. otherwise continue." + else: + continue + break + + # check if we need a trailing None because last segment is partial or doesn't reach end + if result and (result[-1].is_partial or result[-1].end < end): + result.append(None) + + return result + + +def hour_paths_for_range(hours_path, start, end): + """Generate a list of hour paths to check when looking for segments between start and end.""" + # truncate start and end to the hour + def truncate(dt): + return dt.replace(microsecond=0, second=0, minute=0) + current = truncate(start) + end = truncate(end) + # Begin in the hour prior to start, as there may be a segment that starts in that hour + # but contains the start time, eg. if the start time is 01:00:01 and there's a segment + # at 00:59:59 which goes for 3 seconds. + # Checking the entire hour when in most cases it won't be needed is wasteful, but it's also + # pretty quick and the complexity of only checking this case when needed just isn't worth it. + current -= datetime.timedelta(hours=1) + while current <= end: + yield os.path.join(hours_path, current.strftime("%Y-%m-%dT%H")) + current += datetime.timedelta(hours=1) + + +def best_segments_by_start(hour): + """Within a given hour path, yield the "best" segment per unique segment start time. + Best is defined as non-partial, or failing that the longest partial. + Note this means this function may perform os.stat()s in order to find the longest partial. + """ + try: + segment_paths = os.listdir(hour) + except OSError as e: + if e.errno != errno.ENOENT: + raise + # path does not exist, treat it as having no files + return + segment_paths.sort() + # note we only parse them as we need them, which is unlikely to save us much time overall + # but is easy enough to do, so we might as well. + parsed = (parse_segment_path(os.path.join(hour, name)) for name in segment_paths) + for start_time, segments in itertools.groupby(parsed, key=lambda segment: segment.start): + segments = list(segments) + full_segments = [segment for segment in segments if not segment.is_partial] + if full_segments: + if len(full_segments) != 1: + logging.warning("Multiple versions of full segment at start_time {}: {}".format( + start_time, ", ".join(map(str, segments)) + )) + # We've observed some cases where the same segment (with the same hash) will be reported + # with different durations (generally at stream end). Prefer the longer duration, + # as this will ensure that if hashes are different we get the most data, and if they + # are the same it should keep holes to a minimum. + # If same duration, we have to pick one, so pick highest-sorting hash just so we're consistent. + full_segments = [max(full_segments, key=lambda segment: (segment.duration, segment.hash))] + yield full_segments[0] + continue + # no full segments, fall back to measuring partials. + yield max(segments, key=lambda segment: os.stat(segment.path).st_size)