|
|
@ -40,6 +40,18 @@ hash_mismatches = prom.Counter(
|
|
|
|
['remote', 'channel', 'quality', 'hour'],
|
|
|
|
['remote', 'channel', 'quality', 'hour'],
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
get_node_errors = prom.Counter(
|
|
|
|
|
|
|
|
'node_fetch_errors',
|
|
|
|
|
|
|
|
'Number of errors fetching a list of nodes',
|
|
|
|
|
|
|
|
['filename', 'database'],
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
backfill_errors = prom.Counter(
|
|
|
|
|
|
|
|
'backfill_errors',
|
|
|
|
|
|
|
|
'Number of errors backfilling',
|
|
|
|
|
|
|
|
['remote', 'channel'],
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
HOUR_FMT = '%Y-%m-%dT%H'
|
|
|
|
HOUR_FMT = '%Y-%m-%dT%H'
|
|
|
|
TIMEOUT = 5 #default timeout in seconds for remote requests or exceptions
|
|
|
|
TIMEOUT = 5 #default timeout in seconds for remote requests or exceptions
|
|
|
|
MAX_BACKOFF = 4 #number of times to back off
|
|
|
|
MAX_BACKOFF = 4 #number of times to back off
|
|
|
@ -100,7 +112,6 @@ def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment,
|
|
|
|
temp_name = "-".join([date, duration, "temp", str(uuid.uuid4())])
|
|
|
|
temp_name = "-".join([date, duration, "temp", str(uuid.uuid4())])
|
|
|
|
temp_path = os.path.join(dir_name, "{}.ts".format(temp_name))
|
|
|
|
temp_path = os.path.join(dir_name, "{}.ts".format(temp_name))
|
|
|
|
common.ensure_directory(temp_path)
|
|
|
|
common.ensure_directory(temp_path)
|
|
|
|
|
|
|
|
|
|
|
|
hash = hashlib.sha256()
|
|
|
|
hash = hashlib.sha256()
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
@ -115,13 +126,12 @@ def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment,
|
|
|
|
f.write(chunk)
|
|
|
|
f.write(chunk)
|
|
|
|
hash.update(chunk)
|
|
|
|
hash.update(chunk)
|
|
|
|
|
|
|
|
|
|
|
|
hash_str = b64encode(hash.digest(), "-_").rstrip("=")
|
|
|
|
hash_str = b64encode(hash.digest(), "-_").rstrip("=")
|
|
|
|
logger.info('{} {}'.format(missing_segment, hash_str))
|
|
|
|
if hash_str not in missing_segment:
|
|
|
|
if hash_str not in missing_segment:
|
|
|
|
logger.warn('Hash of segment {} does not match. Discarding segment'.format(missing_segment))
|
|
|
|
logger.warn('Hash of segment {} does not match. Discarding segment'.format(missing_segment))
|
|
|
|
hash_mismatches.labels(remote=node, channel=channel, quality=quality, hour=hour).inc()
|
|
|
|
hash_mismatches.labels(remote=node, channel=channel, quality=quality, hour=hour).inc()
|
|
|
|
os.remove(temp_path)
|
|
|
|
os.remove(temp_path)
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#try to get rid of the temp file if an exception is raised.
|
|
|
|
#try to get rid of the temp file if an exception is raised.
|
|
|
|
except Exception:
|
|
|
|
except Exception:
|
|
|
@ -234,6 +244,8 @@ class BackfillerManager(object):
|
|
|
|
failures += 1
|
|
|
|
failures += 1
|
|
|
|
delay = common.jitter(TIMEOUT * 2**failures)
|
|
|
|
delay = common.jitter(TIMEOUT * 2**failures)
|
|
|
|
self.logger.exception('Getting nodes failed. Retrying in {:.0f} s'.format(delay))
|
|
|
|
self.logger.exception('Getting nodes failed. Retrying in {:.0f} s'.format(delay))
|
|
|
|
|
|
|
|
host = [s.split('=')[-1] for s in self.connection.dsn.split() if 'host' in s][0]
|
|
|
|
|
|
|
|
get_node_errors.labels(filename=self.node_file, database=host).inc()
|
|
|
|
self.stopping.wait(delay)
|
|
|
|
self.stopping.wait(delay)
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
exisiting_nodes = set(self.workers.keys())
|
|
|
|
exisiting_nodes = set(self.workers.keys())
|
|
|
@ -392,7 +404,7 @@ class BackfillerWorker(object):
|
|
|
|
worker.get() # re-raise error, if any
|
|
|
|
worker.get() # re-raise error, if any
|
|
|
|
|
|
|
|
|
|
|
|
self.logger.info('{} segments in {}/{} backfilled'.format(len(workers), quality, hour))
|
|
|
|
self.logger.info('{} segments in {}/{} backfilled'.format(len(workers), quality, hour))
|
|
|
|
hour_backfilled.labels(remote=self.node, channel=self.channel, quality=self.quality).inc()
|
|
|
|
hours_backfilled.labels(remote=self.node, channel=self.channel, quality=quality).inc()
|
|
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
def run(self):
|
|
|
|
self.logger.info('Starting')
|
|
|
|
self.logger.info('Starting')
|
|
|
@ -413,6 +425,7 @@ class BackfillerWorker(object):
|
|
|
|
failures += 1
|
|
|
|
failures += 1
|
|
|
|
delay = common.jitter(TIMEOUT * 2**failures)
|
|
|
|
delay = common.jitter(TIMEOUT * 2**failures)
|
|
|
|
self.logger.exception('Backfill failed. Retrying in {:.0f} s'.format(delay))
|
|
|
|
self.logger.exception('Backfill failed. Retrying in {:.0f} s'.format(delay))
|
|
|
|
|
|
|
|
backfill_errors.labels(remote=self.node, channel=self.channel).inc()
|
|
|
|
self.stopping.wait(delay)
|
|
|
|
self.stopping.wait(delay)
|
|
|
|
|
|
|
|
|
|
|
|
if self.run_once:
|
|
|
|
if self.run_once:
|
|
|
|