backfiller: Only run one manager, not one per channel

Then treat backfilling each channel just like backfilling each quality.

This is conceptually simpler (only one kind of thing, a (channel, quality))
and has better behaviour when a node is down (we only have one lot of error handling around it).

It also means we aren't asking the database for the same info once per channel,
and cuts down on logging noise.
pull/142/head
Mike Lang 5 years ago
parent a6cd07077a
commit 71333cf826

@ -3,6 +3,7 @@
import datetime
import errno
import hashlib
import itertools
import logging
import os
import random
@ -50,7 +51,7 @@ node_list_errors = prom.Counter(
backfill_errors = prom.Counter(
'backfill_errors',
'Number of errors backfilling',
['remote', 'channel'],
['remote'],
)
segments_deleted = prom.Counter(
@ -169,10 +170,10 @@ def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment,
logger.info('Segment {}/{}/{} backfilled'.format(quality, hour, missing_segment))
def list_hours(node, channel, qualities, start=None):
def list_hours(node, channel, quality, start=None):
"""Return a list of all available hours from a node.
List all hours available from node/channel for each quality in qualities
List all hours available from node/channel
ordered from newest to oldest.
Keyword arguments:
@ -180,8 +181,7 @@ def list_hours(node, channel, qualities, start=None):
return hours more recent than that number of hours ago. If None (default),
all hours are returned."""
hour_lists = [list_remote_hours(node, channel, quality) for quality in qualities]
hours = list(set().union(*hour_lists))
hours = list_remote_hours(node, channel, quality)
hours.sort(reverse=True) #latest hour first
if start is not None:
@ -204,7 +204,7 @@ class BackfillerManager(object):
NODE_INTERVAL = 300 #seconds between updating list of nodes
def __init__(self, base_dir, channel, qualities, static_nodes=[],
def __init__(self, base_dir, channels, qualities, static_nodes=[],
start=None, delete_old=False, run_once=False, node_file=None,
node_database=None, localhost=None, download_concurrency=5,
recent_cutoff=120):
@ -212,7 +212,7 @@ class BackfillerManager(object):
Creates a manager for a given channel with specified qualities."""
self.base_dir = base_dir
self.channel = channel
self.channels = channels
self.qualities = qualities
self.static_nodes = static_nodes
self.start = start
@ -225,7 +225,7 @@ class BackfillerManager(object):
self.download_concurrency = download_concurrency
self.recent_cutoff = recent_cutoff
self.stopping = gevent.event.Event()
self.logger = logging.getLogger("BackfillerManager({})".format(channel))
self.logger = logging.getLogger("BackfillerManager")
self.workers = {} # {node url: worker}
def stop(self):
@ -257,8 +257,8 @@ class BackfillerManager(object):
else:
self.logger.info('Deleting hours older than {} hours ago'.format(self.start))
for quality in self.qualities:
hours = list_local_hours(self.base_dir, self.channel, quality)
for channel, quality in itertools.product(self.channels, self.qualities):
hours = list_local_hours(self.base_dir, channel, quality)
if not isinstance(self.start, datetime.datetime):
cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=self.start)
else:
@ -270,13 +270,13 @@ class BackfillerManager(object):
# deleting segments can take a bit time but is less important
# than the actually backfilling so we yield
gevent.idle()
path = os.path.join(self.base_dir, self.channel, quality, hour)
path = os.path.join(self.base_dir, channel, quality, hour)
self.logger.info('Deleting {}'.format(path))
segments = list_local_segments(self.base_dir, self.channel, quality, hour)
segments = list_local_segments(self.base_dir, channel, quality, hour)
for segment in segments:
try:
os.remove(os.path.join(path, segment))
segments_deleted.labels(channel=self.channel, quality=quality, hour=hour).inc()
segments_deleted.labels(channel=channel, quality=quality, hour=hour).inc()
except OSError as e:
# ignore error when the file is already gone
if e.errno != errno.ENOENT:
@ -411,7 +411,7 @@ class BackfillerWorker(object):
self.base_dir = manager.base_dir
self.node = node
self.download_concurrency = manager.download_concurrency
self.channel = manager.channel
self.channels = manager.channels
self.qualities = manager.qualities
self.start = manager.start
self.run_once = manager.run_once
@ -420,7 +420,7 @@ class BackfillerWorker(object):
self.done = gevent.event.Event()
def __repr__(self):
return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.channel)
return '<{} at 0x{:x} for {!r}>'.format(type(self).__name__, id(self), self.node)
__str__ = __repr__
def stop(self):
@ -428,15 +428,14 @@ class BackfillerWorker(object):
self.logger.info('Stopping')
self.stopping.set()
def backfill(self, hours):
def backfill(self):
"""Backfill from remote node.
Backfill from node/channel/qualities to base_dir/channel/qualities for
each hour in hours.
"""
for quality in self.qualities:
for hour in hours:
for channel, quality in itertools.product(self.channels, self.qualities):
for hour in list_hours(self.node, channel, quality, self.start):
# since backfilling can take a long time, recheck whether this
# hour is after the start
if self.start is not None:
@ -449,8 +448,8 @@ class BackfillerWorker(object):
self.logger.info('Backfilling {}/{}'.format(quality, hour))
local_segments = set(list_local_segments(self.base_dir, self.channel, quality, hour))
remote_segments = set(list_remote_segments(self.node, self.channel, quality, hour))
local_segments = set(list_local_segments(self.base_dir, channel, quality, hour))
remote_segments = set(list_remote_segments(self.node, channel, quality, hour))
missing_segments = list(remote_segments - local_segments)
# randomise the order of the segments to reduce the chance that
@ -465,7 +464,7 @@ class BackfillerWorker(object):
if self.stopping.is_set():
return
path = os.path.join(self.channel, quality, hour, missing_segment)
path = os.path.join(channel, quality, hour, missing_segment)
# test to see if file is a segment and get the segments start time
try:
@ -488,7 +487,7 @@ class BackfillerWorker(object):
# start segment as soon as a pool slot opens up, then track it in workers
workers.append(pool.spawn(
get_remote_segment,
self.base_dir, self.node, self.channel, quality, hour, missing_segment, self.logger
self.base_dir, self.node, channel, quality, hour, missing_segment, self.logger
))
# verify that all the workers succeeded. if any failed, raise the exception from
@ -497,7 +496,7 @@ class BackfillerWorker(object):
worker.get() # re-raise error, if any
self.logger.info('{} segments in {}/{} backfilled'.format(len(workers), quality, hour))
hours_backfilled.labels(remote=self.node, channel=self.channel, quality=quality).inc()
hours_backfilled.labels(remote=self.node, channel=channel, quality=quality).inc()
def run(self):
@ -507,7 +506,7 @@ class BackfillerWorker(object):
while not self.stopping.is_set():
try:
self.logger.info('Starting backfill')
self.backfill(list_hours(self.node, self.channel, self.qualities, self.start))
self.backfill()
self.logger.info('Backfill complete')
failures = 0 #reset failure count on a successful backfill
if not self.run_once:
@ -518,7 +517,7 @@ class BackfillerWorker(object):
failures += 1
delay = common.jitter(TIMEOUT * 2**failures)
self.logger.exception('Backfill failed. Retrying in {:.0f} s'.format(delay))
backfill_errors.labels(remote=self.node, channel=self.channel).inc()
backfill_errors.labels(remote=self.node).inc()
self.stopping.wait(delay)
if self.run_once:
@ -566,18 +565,12 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8002,
common.install_stacksampler()
prom.start_http_server(metrics_port)
managers = []
workers = []
for channel in channels:
logging.info('Starting backfilling {} with {} as qualities to {}'.format(channel, ', '.join(qualities), base_dir))
manager = BackfillerManager(base_dir, channel, qualities, static_nodes,
logging.info('Starting backfilling {} with {} as qualities to {}'.format(', '.join(channels), ', '.join(qualities), base_dir))
manager = BackfillerManager(base_dir, channels, qualities, static_nodes,
start, delete_old, run_once, node_file, node_database,
localhost, download_concurrency, recent_cutoff)
managers.append(manager)
workers.append(gevent.spawn(manager.run))
def stop():
for manager in managers:
manager.stop()
gevent.signal(signal.SIGTERM, stop)
@ -585,19 +578,6 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8002,
if backdoor_port:
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
# Wait for any to die
gevent.wait(workers, count=1)
# If one has stopped, either:
# 1. stop() was called and all are stopping
# 2. one errored and we should stop all remaining and report the error
# Our behaviour in both cases is the same:
# 1. Tell all managers to gracefully stop
stop()
# 2. Wait (with timeout) until they've stopped
gevent.wait(workers)
# 3. Check if any of them failed. If they did, report it. If mulitple
# failed, we report one arbitrarily.
for worker in workers:
worker.get()
manager.run()
logging.info('Gracefully stopped')

Loading…
Cancel
Save