moved rename, ensure_directory and jitter to common

Move a few useful functions in downloader used in the backfiller to common
pull/18/head
Christopher Usher 6 years ago committed by Mike Lang
parent 7d26997b1f
commit 3cdfaad664

@ -77,10 +77,9 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
if os.path.exists(path): if os.path.exists(path):
return return
common.ensure_directory(path)
substrs = path.split('-') substrs = path.split('-')
temp_path = '-'.join(substrs[:-1] + [str(uuid.uuid4()) + '.st']) temp_path = '-'.join(substrs[:-1] + [str(uuid.uuid4()) + '.st'])
common.ensure_directory(temp_path)
uri = '{}/segments/{}/{}/{}/{}'.format(node, stream, variant, hour, missing_segment) uri = '{}/segments/{}/{}/{}/{}'.format(node, stream, variant, hour, missing_segment)
resp = requests.get(uri, stream=True, timeout=timeout) resp = requests.get(uri, stream=True, timeout=timeout)
@ -95,14 +94,11 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
def backfill(base_dir, stream, variants, hours=None, nodes=None): def backfill(base_dir, stream, variants, hours=None, nodes=None):
# loop over nodes asking for a list of segments then downloads any # loop over nodes backfilling from each
# segments it doesn't have
if nodes is None: if nodes is None:
nodes = get_nodes() nodes = get_nodes()
#ideally do this in parallel #ideally do this in parallel
for node in nodes: for node in nodes:
@ -114,7 +110,7 @@ def backfill(base_dir, stream, variants, hours=None, nodes=None):
print node, e print node, e
def backfill_node(base_dir, node, stream, variants, hours, recent_cutoff=60): def backfill_node(base_dir, node, stream, variants, hours=None, recent_cutoff=60):
# if hours is int, backfill last hours hourdirs # if hours is int, backfill last hours hourdirs

@ -252,3 +252,36 @@ def best_segments_by_start(hour):
continue continue
# no full segments, fall back to measuring partials. # no full segments, fall back to measuring partials.
yield max(segments, key=lambda segment: os.stat(segment.path).st_size) yield max(segments, key=lambda segment: os.stat(segment.path).st_size)
def rename(old, new):
"""Atomic rename that succeeds if the target already exists, since we're naming everything
by hash anyway, so if the filepath already exists the file itself is already there.
In this case, we delete the source file.
"""
try:
os.rename(old, new)
except OSError as e:
if e.errno != errno.EEXIST:
raise
os.remove(old)
def ensure_directory(path):
"""Create directory that contains path, as well as any parent directories,
if they don't already exist."""
dir_path = os.path.dirname(path)
if os.path.exists(dir_path):
return
ensure_directory(dir_path)
try:
os.mkdir(dir_path)
except OSError as e:
# Ignore if EEXISTS. This is needed to avoid a race if two getters run at once.
if e.errno != errno.EEXIST:
raise
def jitter(interval):
"""Apply some 'jitter' to an interval. This is a random +/- 10% change in order to
smooth out patterns and prevent everything from retrying at the same time.
"""
return interval * (0.9 + 0.2 * random.random())

@ -18,6 +18,7 @@ import requests
from monotonic import monotonic from monotonic import monotonic
import twitch import twitch
import common
class TimedOutError(Exception): class TimedOutError(Exception):
@ -58,11 +59,6 @@ def soft_hard_timeout(description, (soft_timeout, hard_timeout), on_soft_timeout
finished = True finished = True
def jitter(interval):
"""Apply some 'jitter' to an interval. This is a random +/- 10% change in order to
smooth out patterns and prevent everything from retrying at the same time.
"""
return interval * (0.9 + 0.2 * random.random())
class StreamsManager(object): class StreamsManager(object):
@ -221,7 +217,7 @@ class StreamsManager(object):
self.refresh_needed.clear() self.refresh_needed.clear()
gevent.spawn(self.fetch_latest) gevent.spawn(self.fetch_latest)
# wait min retry interval with jitter, unless we're stopping # wait min retry interval with jitter, unless we're stopping
self.stopping.wait(jitter(self.FETCH_MIN_INTERVAL)) self.stopping.wait(common.jitter(self.FETCH_MIN_INTERVAL))
logging.info("Stopping workers") logging.info("Stopping workers")
for workers in self.stream_workers.values(): for workers in self.stream_workers.values():
for worker in workers: for worker in workers:
@ -292,7 +288,7 @@ class StreamWorker(object):
def wait(self, interval): def wait(self, interval):
"""Wait for given interval with jitter, unless we're stopping""" """Wait for given interval with jitter, unless we're stopping"""
self.stopping.wait(jitter(interval)) self.stopping.wait(common.jitter(interval))
def _run(self): def _run(self):
first = True first = True
@ -386,7 +382,7 @@ class SegmentGetter(object):
self._run() self._run()
except Exception: except Exception:
logging.exception("Failure in SegmentGetter {}".format(self.segment)) logging.exception("Failure in SegmentGetter {}".format(self.segment))
gevent.sleep(jitter(self.UNEXPECTED_FAILURE_RETRY)) gevent.sleep(common.jitter(self.UNEXPECTED_FAILURE_RETRY))
else: else:
break break
finally: finally:
@ -402,7 +398,7 @@ class SegmentGetter(object):
if worker.ready() and worker.value: if worker.ready() and worker.value:
break break
# if retry not set, wait for FETCH_RETRY first # if retry not set, wait for FETCH_RETRY first
self.retry.wait(jitter(self.FETCH_RETRY)) self.retry.wait(common.jitter(self.FETCH_RETRY))
def make_path_prefix(self): def make_path_prefix(self):
"""Generate leading part of filepath which doesn't change with the hash.""" """Generate leading part of filepath which doesn't change with the hash."""
@ -440,20 +436,6 @@ class SegmentGetter(object):
full_prefix = "{}-full".format(self.prefix) full_prefix = "{}-full".format(self.prefix)
return any(candidate.startswith(full_prefix) for candidate in candidates) return any(candidate.startswith(full_prefix) for candidate in candidates)
def ensure_directory(self, path):
"""Create directory that contains path, as well as any parent directories,
if they don't already exist."""
dir_path = os.path.dirname(path)
if os.path.exists(dir_path):
return
# call recursively to check entire path exists
self.ensure_directory(dir_path)
try:
os.mkdir(dir_path)
except OSError as e:
# Ignore if EEXISTS. This is needed to avoid a race if two getters run at once.
if e.errno != errno.EEXIST:
raise
def get_segment(self): def get_segment(self):
# save current value of self.retry so we can't set any later instance # save current value of self.retry so we can't set any later instance
@ -479,7 +461,7 @@ class SegmentGetter(object):
logging.warning("Got 403 Forbidden for segment, giving up: {}".format(self.segment)) logging.warning("Got 403 Forbidden for segment, giving up: {}".format(self.segment))
return return
resp.raise_for_status() resp.raise_for_status()
self.ensure_directory(temp_path) ensure_directory(temp_path)
with open(temp_path, 'w') as f: with open(temp_path, 'w') as f:
file_created = True file_created = True
# We read chunk-wise in 8KiB chunks. Note that if the connection cuts halfway, # We read chunk-wise in 8KiB chunks. Note that if the connection cuts halfway,
@ -493,22 +475,12 @@ class SegmentGetter(object):
# another exception in the interim # another exception in the interim
ex_type, ex, tb = sys.exc_info() ex_type, ex, tb = sys.exc_info()
if file_created: if file_created:
self.rename(temp_path, self.make_path("partial", hash)) common.rename(temp_path, self.make_path("partial", hash))
raise ex_type, ex, tb raise ex_type, ex, tb
else: else:
self.rename(temp_path, self.make_path("full", hash)) common.rename(temp_path, self.make_path("full", hash))
def rename(self, old, new):
"""Atomic rename that succeeds if the target already exists, since we're naming everything
by hash anyway, so if the filepath already exists the file itself is already there.
In this case, we delete the source file.
"""
try:
os.rename(old, new)
except OSError as e:
if e.errno != errno.EEXIST:
raise
os.remove(old)
def main(channel, base_dir=".", qualities=""): def main(channel, base_dir=".", qualities=""):

Loading…
Cancel
Save