From 828b23e12a2a3a06244adf2b3552d94969578435 Mon Sep 17 00:00:00 2001 From: Christopher Usher Date: Fri, 28 Dec 2018 23:21:44 -0800 Subject: [PATCH] moved rename, ensure_directory and jitter to common Move a few useful functions in downloader used in the backfiller to common --- backfiller/backfiller/main.py | 12 +++------ common/common.py | 33 +++++++++++++++++++++++++ downloader/downloader/main.py | 46 ++++++++--------------------------- 3 files changed, 47 insertions(+), 44 deletions(-) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index bda5fa2..e2fedfc 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -29,7 +29,7 @@ def get_nodes(): # nodes so that # as a prototype can just hardcode some addresses. - # each element in nodes is a 'protocol://host:port/' string + # each element in nodes is a 'protocol://host:port/' string nodes = [] @@ -77,10 +77,9 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, if os.path.exists(path): return - common.ensure_directory(path) - substrs = path.split('-') temp_path = '-'.join(substrs[:-1] + [str(uuid.uuid4()) + '.st']) + common.ensure_directory(temp_path) uri = '{}/segments/{}/{}/{}/{}'.format(node, stream, variant, hour, missing_segment) resp = requests.get(uri, stream=True, timeout=timeout) @@ -95,14 +94,11 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, def backfill(base_dir, stream, variants, hours=None, nodes=None): - # loop over nodes asking for a list of segments then downloads any - # segments it doesn't have + # loop over nodes backfilling from each if nodes is None: nodes = get_nodes() - - #ideally do this in parallel for node in nodes: @@ -114,7 +110,7 @@ def backfill(base_dir, stream, variants, hours=None, nodes=None): print node, e -def backfill_node(base_dir, node, stream, variants, hours, recent_cutoff=60): +def backfill_node(base_dir, node, stream, variants, hours=None, recent_cutoff=60): # if hours is int, backfill last hours hourdirs diff --git a/common/common.py b/common/common.py index be6f71b..558181e 100644 --- a/common/common.py +++ b/common/common.py @@ -252,3 +252,36 @@ def best_segments_by_start(hour): continue # no full segments, fall back to measuring partials. yield max(segments, key=lambda segment: os.stat(segment.path).st_size) + + +def rename(old, new): + """Atomic rename that succeeds if the target already exists, since we're naming everything + by hash anyway, so if the filepath already exists the file itself is already there. + In this case, we delete the source file. + """ + try: + os.rename(old, new) + except OSError as e: + if e.errno != errno.EEXIST: + raise + os.remove(old) + +def ensure_directory(path): + """Create directory that contains path, as well as any parent directories, + if they don't already exist.""" + dir_path = os.path.dirname(path) + if os.path.exists(dir_path): + return + ensure_directory(dir_path) + try: + os.mkdir(dir_path) + except OSError as e: + # Ignore if EEXISTS. This is needed to avoid a race if two getters run at once. + if e.errno != errno.EEXIST: + raise + +def jitter(interval): + """Apply some 'jitter' to an interval. This is a random +/- 10% change in order to + smooth out patterns and prevent everything from retrying at the same time. + """ + return interval * (0.9 + 0.2 * random.random()) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 86718e7..0d67ce3 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -17,6 +17,7 @@ import requests from monotonic import monotonic import twitch +import common class TimedOutError(Exception): @@ -57,11 +58,6 @@ def soft_hard_timeout(description, (soft_timeout, hard_timeout), on_soft_timeout finished = True -def jitter(interval): - """Apply some 'jitter' to an interval. This is a random +/- 10% change in order to - smooth out patterns and prevent everything from retrying at the same time. - """ - return interval * (0.9 + 0.2 * random.random()) class StreamsManager(object): @@ -214,7 +210,7 @@ class StreamsManager(object): self.refresh_needed.clear() gevent.spawn(self.fetch_latest) # wait min retry interval with jitter, unless we're stopping - self.stopping.wait(jitter(self.FETCH_MIN_INTERVAL)) + self.stopping.wait(common.jitter(self.FETCH_MIN_INTERVAL)) logging.info("Stopping workers") for workers in self.stream_workers.values(): for worker in workers: @@ -285,7 +281,7 @@ class StreamWorker(object): def wait(self, interval): """Wait for given interval with jitter, unless we're stopping""" - self.stopping.wait(jitter(interval)) + self.stopping.wait(common.jitter(interval)) def _run(self): first = True @@ -378,7 +374,7 @@ class SegmentGetter(object): self._run() except Exception: logging.exception("Failure in SegmentGetter {}".format(self.segment)) - gevent.sleep(jitter(self.UNEXPECTED_FAILURE_RETRY)) + gevent.sleep(common.jitter(self.UNEXPECTED_FAILURE_RETRY)) else: break finally: @@ -394,7 +390,7 @@ class SegmentGetter(object): if worker.ready() and worker.value: break # if retry not set, wait for FETCH_RETRY first - self.retry.wait(jitter(self.FETCH_RETRY)) + self.retry.wait(common.jitter(self.FETCH_RETRY)) def make_path_prefix(self): """Generate leading part of filepath which doesn't change with the hash.""" @@ -431,19 +427,7 @@ class SegmentGetter(object): full_prefix = "{}-full".format(self.prefix) return any(candidate.startswith(full_prefix) for candidate in candidates) - def ensure_directory(self, path): - """Create directory that contains path, as well as any parent directories, - if they don't already exist.""" - dir_path = os.path.dirname(path) - if os.path.exists(dir_path): - return - self.ensure_directory(dir_path) - try: - os.mkdir(dir_path) - except OSError as e: - # Ignore if EEXISTS. This is needed to avoid a race if two getters run at once. - if e.errno != errno.EEXIST: - raise + def get_segment(self): # save current value of self.retry so we can't set any later instance @@ -469,7 +453,7 @@ class SegmentGetter(object): logging.warning("Got 403 Forbidden for segment, giving up: {}".format(self.segment)) return resp.raise_for_status() - self.ensure_directory(temp_path) + ensure_directory(temp_path) with open(temp_path, 'w') as f: file_created = True # We read chunk-wise in 8KiB chunks. Note that if the connection cuts halfway, @@ -483,22 +467,12 @@ class SegmentGetter(object): # another exception in the interim ex_type, ex, tb = sys.exc_info() if file_created: - self.rename(temp_path, self.make_path("partial", hash)) + common.rename(temp_path, self.make_path("partial", hash)) raise ex_type, ex, tb else: - self.rename(temp_path, self.make_path("full", hash)) + common.rename(temp_path, self.make_path("full", hash)) + - def rename(self, old, new): - """Atomic rename that succeeds if the target already exists, since we're naming everything - by hash anyway, so if the filepath already exists the file itself is already there. - In this case, we delete the source file. - """ - try: - os.rename(old, new) - except OSError as e: - if e.errno != errno.EEXIST: - raise - os.remove(old) def main(channel, base_dir=".", qualities=""):