From a52daea9949ff4f5199c24f27c52d344fd05221a Mon Sep 17 00:00:00 2001 From: Chris Usher Date: Sun, 13 Jan 2019 13:13:00 +0000 Subject: [PATCH] reintroduced a start time for the backfiller; more logging --- backfiller/backfiller/main.py | 31 +++++++++++++++++++++---------- backfiller/setup.py | 1 + 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 231b29d..8788c30 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -9,6 +9,7 @@ import random import time import uuid +import dateutil.parser import gevent.backdoor import prometheus_client as prom import requests @@ -97,10 +98,10 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, locally, this does not attempt to fetch it.""" path = os.path.join(base_dir, stream, variant, hour, missing_segment) - logging.debug('Getting segment {}'.format(path)) # check to see if file was created since we listed the local segments to # avoid unnecessarily copying if os.path.exists(path): + logging.debug('Skipping exisiting segment {}'.format(path)) return dir_name = os.path.dirname(path) @@ -110,6 +111,7 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, common.ensure_directory(temp_path) try: + logging.debug('Fetching segment {} from {}'.format(path, node)) uri = '{}/segments/{}/{}/{}/{}'.format(node, stream, variant, hour, missing_segment) resp = requests.get(uri, stream=True, timeout=timeout) @@ -129,7 +131,7 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, segments_backfilled.labels(remote=node, stream=stream, variant=variant, hour=hour).inc() -def backfill(base_dir, stream, variants, hours=None, nodes=None): +def backfill(base_dir, stream, variants, hours=None, nodes=None, start=None): """Loop over nodes backfilling from each. Backfill from node/stream/variants to base_dir/stream/variants for each node @@ -157,7 +159,7 @@ def is_iterable(x): return True -def backfill_node(base_dir, node, stream, variants, hours=None, segment_order='random', recent_cutoff=60): +def backfill_node(base_dir, node, stream, variants, hours=None, segment_order='random', recent_cutoff=60, start=None): """Backfill from remote node. Backfill from node/stream/variants to base_dir/stream/variants. @@ -170,9 +172,10 @@ def backfill_node(base_dir, node, stream, variants, hours=None, segment_order='r If 'forward', sort the segment in ascending order. If 'reverse', sort the segments in descending order. Otherwise, do not change the order of segments. - use start and stop to limit which segments are backfilled. recent_cutoff -- Skip backfilling segments younger than this number of - seconds to prioritise letting the downloader grab these segments.""" + seconds to prioritise letting the downloader grab these segments. + start -- Do not backfill hours starting before this time. If None (default), + all hours are backfilled""" logging.info('Starting backfilling from {}'.format(node)) @@ -194,6 +197,11 @@ def backfill_node(base_dir, node, stream, variants, hours=None, segment_order='r for variant in variants: for hour in hours: + hour_time = datetime.datetime.strptime(hour, HOUR_FMT) + if start is not None and hour_time < start: + logging.debug('Skipping {}/{}/{} as before start'.format(stream, variant, hour)) + continue + logging.info('Backfilling {}/{}/{}'.format(stream, variant, hour)) local_segments = set(list_local_segments(base_dir, stream, variant, hour)) @@ -221,6 +229,7 @@ def backfill_node(base_dir, node, stream, variants, hours=None, segment_order='r #to avoid getting in the downloader's way ignore segments less than recent_cutoff old if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=recent_cutoff): + logging.debug('Skipping {} as too recent'.format(path)) continue get_remote_segment(base_dir, node, stream, variant, hour, missing_segment) @@ -229,7 +238,7 @@ def backfill_node(base_dir, node, stream, variants, hours=None, segment_order='r logging.info('Finished backfilling from {}'.format(node)) -def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180, sleep_time=1, metrics_port=8002, nodes=None, backdoor_port=0): +def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180, sleep_time=1, metrics_port=8002, nodes=None, backdoor_port=0, start=None): """Prototype backfiller service. Do a backfill of the last 3 hours from stream/variants from all nodes @@ -245,6 +254,8 @@ def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180, variants = variants.split(',') if variants else [] if nodes is not None: nodes = nodes.split(',') if nodes else [] + if start is not None: + start = dateutil.parser.parse(start) common.PromLogCountsHandler.install() common.install_stacksampler() @@ -259,9 +270,9 @@ def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180, full_fill_start = fill_start - backfill(base_dir, stream, variants, 3, nodes=nodes) + backfill(base_dir, stream, variants, 3, nodes=nodes, start=start) - backfill(base_dir, stream, variants, nodes=nodes) + backfill(base_dir, stream, variants, nodes=nodes, start=start) # I'm sure there is a module that does this in a more robust way # but I understand this and it gives the behaviour I want @@ -271,14 +282,14 @@ def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180, if now - full_fill_start > datetime.timedelta(minutes=full_fill_wait): - backfill(base_dir, stream, variants, nodes=nodes) + backfill(base_dir, stream, variants, nodes=nodes, start=start) fill_start = now full_fill_start = fill_start elif now - fill_start > datetime.timedelta(minutes=fill_wait): - backfill(base_dir, stream, variants, 3, nodes=nodes) + backfill(base_dir, stream, variants, 3, nodes=nodes, start=start) fill_start = now diff --git a/backfiller/setup.py b/backfiller/setup.py index 1b4ee32..d8fceba 100644 --- a/backfiller/setup.py +++ b/backfiller/setup.py @@ -7,6 +7,7 @@ setup( install_requires = [ "argh", "gevent", + "python-dateutil", "requests", "wubloader-common", ],