|
|
@ -10,7 +10,6 @@ import signal
|
|
|
|
import socket
|
|
|
|
import socket
|
|
|
|
import urlparse
|
|
|
|
import urlparse
|
|
|
|
import uuid
|
|
|
|
import uuid
|
|
|
|
from base64 import b64encode
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import argh
|
|
|
|
import argh
|
|
|
|
import gevent.backdoor
|
|
|
|
import gevent.backdoor
|
|
|
@ -40,8 +39,8 @@ hash_mismatches = prom.Counter(
|
|
|
|
['remote', 'channel', 'quality', 'hour'],
|
|
|
|
['remote', 'channel', 'quality', 'hour'],
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
get_node_errors = prom.Counter(
|
|
|
|
node_list_errors = prom.Counter(
|
|
|
|
'node_fetch_errors',
|
|
|
|
'node_list_errors',
|
|
|
|
'Number of errors fetching a list of nodes',
|
|
|
|
'Number of errors fetching a list of nodes',
|
|
|
|
['filename', 'database'],
|
|
|
|
['filename', 'database'],
|
|
|
|
)
|
|
|
|
)
|
|
|
@ -126,8 +125,7 @@ def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment,
|
|
|
|
f.write(chunk)
|
|
|
|
f.write(chunk)
|
|
|
|
hash.update(chunk)
|
|
|
|
hash.update(chunk)
|
|
|
|
|
|
|
|
|
|
|
|
hash_str = b64encode(hash.digest(), "-_").rstrip("=")
|
|
|
|
if hash != common.parse_segment_path(missing_segment).hash:
|
|
|
|
if hash_str not in missing_segment:
|
|
|
|
|
|
|
|
logger.warn('Hash of segment {} does not match. Discarding segment'.format(missing_segment))
|
|
|
|
logger.warn('Hash of segment {} does not match. Discarding segment'.format(missing_segment))
|
|
|
|
hash_mismatches.labels(remote=node, channel=channel, quality=quality, hour=hour).inc()
|
|
|
|
hash_mismatches.labels(remote=node, channel=channel, quality=quality, hour=hour).inc()
|
|
|
|
os.remove(temp_path)
|
|
|
|
os.remove(temp_path)
|
|
|
@ -244,8 +242,11 @@ class BackfillerManager(object):
|
|
|
|
failures += 1
|
|
|
|
failures += 1
|
|
|
|
delay = common.jitter(TIMEOUT * 2**failures)
|
|
|
|
delay = common.jitter(TIMEOUT * 2**failures)
|
|
|
|
self.logger.exception('Getting nodes failed. Retrying in {:.0f} s'.format(delay))
|
|
|
|
self.logger.exception('Getting nodes failed. Retrying in {:.0f} s'.format(delay))
|
|
|
|
host = [s.split('=')[-1] for s in self.connection.dsn.split() if 'host' in s][0]
|
|
|
|
try:
|
|
|
|
get_node_errors.labels(filename=self.node_file, database=host).inc()
|
|
|
|
host = [s.split('=')[-1] for s in self.connection.dsn.split() if 'host' in s][0]
|
|
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
|
|
host = ''
|
|
|
|
|
|
|
|
node_list_errors.labels(filename=self.node_file, database=host).inc()
|
|
|
|
self.stopping.wait(delay)
|
|
|
|
self.stopping.wait(delay)
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
exisiting_nodes = set(self.workers.keys())
|
|
|
|
exisiting_nodes = set(self.workers.keys())
|
|
|
|