From 34785d1179331b3bb931583e52d2a00fe87c09a2 Mon Sep 17 00:00:00 2001 From: Christopher Usher Date: Sun, 6 Oct 2019 17:47:01 +0100 Subject: [PATCH] now checking the hashes --- backfiller/backfiller/main.py | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index e08a857..e256182 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -40,6 +40,18 @@ hash_mismatches = prom.Counter( ['remote', 'channel', 'quality', 'hour'], ) +get_node_errors = prom.Counter( + 'node_fetch_errors', + 'Number of errors fetching a list of nodes', + ['filename', 'database'], +) + +backfill_errors = prom.Counter( + 'backfill_errors', + 'Number of errors backfilling', + ['remote', 'channel'], +) + HOUR_FMT = '%Y-%m-%dT%H' TIMEOUT = 5 #default timeout in seconds for remote requests or exceptions MAX_BACKOFF = 4 #number of times to back off @@ -100,7 +112,6 @@ def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment, temp_name = "-".join([date, duration, "temp", str(uuid.uuid4())]) temp_path = os.path.join(dir_name, "{}.ts".format(temp_name)) common.ensure_directory(temp_path) - hash = hashlib.sha256() try: @@ -115,13 +126,12 @@ def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment, f.write(chunk) hash.update(chunk) - hash_str = b64encode(hash.digest(), "-_").rstrip("=") - logger.info('{} {}'.format(missing_segment, hash_str)) - if hash_str not in missing_segment: - logger.warn('Hash of segment {} does not match. Discarding segment'.format(missing_segment)) - hash_mismatches.labels(remote=node, channel=channel, quality=quality, hour=hour).inc() - os.remove(temp_path) - return + hash_str = b64encode(hash.digest(), "-_").rstrip("=") + if hash_str not in missing_segment: + logger.warn('Hash of segment {} does not match. Discarding segment'.format(missing_segment)) + hash_mismatches.labels(remote=node, channel=channel, quality=quality, hour=hour).inc() + os.remove(temp_path) + return #try to get rid of the temp file if an exception is raised. except Exception: @@ -234,6 +244,8 @@ class BackfillerManager(object): failures += 1 delay = common.jitter(TIMEOUT * 2**failures) self.logger.exception('Getting nodes failed. Retrying in {:.0f} s'.format(delay)) + host = [s.split('=')[-1] for s in self.connection.dsn.split() if 'host' in s][0] + get_node_errors.labels(filename=self.node_file, database=host).inc() self.stopping.wait(delay) continue exisiting_nodes = set(self.workers.keys()) @@ -392,7 +404,7 @@ class BackfillerWorker(object): worker.get() # re-raise error, if any self.logger.info('{} segments in {}/{} backfilled'.format(len(workers), quality, hour)) - hour_backfilled.labels(remote=self.node, channel=self.channel, quality=self.quality).inc() + hours_backfilled.labels(remote=self.node, channel=self.channel, quality=quality).inc() def run(self): self.logger.info('Starting') @@ -413,6 +425,7 @@ class BackfillerWorker(object): failures += 1 delay = common.jitter(TIMEOUT * 2**failures) self.logger.exception('Backfill failed. Retrying in {:.0f} s'.format(delay)) + backfill_errors.labels(remote=self.node, channel=self.channel).inc() self.stopping.wait(delay) if self.run_once: