diff --git a/backfiller/backfiller/main.py b/backfiller/backfiller/main.py index 6b48ad5..df24588 100644 --- a/backfiller/backfiller/main.py +++ b/backfiller/backfiller/main.py @@ -32,16 +32,16 @@ TIMEOUT = 5 #default timeout in seconds for remote requests or exceptions MAX_BACKOFF = 4 #number of times to back off -def list_local_segments(base_dir, stream, variant, hour): +def list_local_segments(base_dir, channel, quality, hour): """List segments in a given hour directory. - For a given base_dir/stream/variant/hour directory return a list of + For a given base_dir/channel/quality/hour directory return a list of non-hidden files. If the directory path is not found, return an empty list. Based on based on restreamer.list_segments. We could just call restreamer.list_segments but this avoids HTTP/JSON overheads.""" - path = os.path.join(base_dir, stream, variant, hour) + path = os.path.join(base_dir, channel, quality, hour) try: return [name for name in os.listdir(path) if not name.startswith('.')] @@ -51,31 +51,31 @@ def list_local_segments(base_dir, stream, variant, hour): return [] -def list_remote_hours(node, stream, variant, timeout=TIMEOUT): +def list_remote_hours(node, channel, quality, timeout=TIMEOUT): """Wrapper around a call to restreamer.list_hours.""" - uri = '{}/files/{}/{}'.format(node, stream, variant) + uri = '{}/files/{}/{}'.format(node, channel, quality) logging.debug('Getting list of hours from {}'.format(uri)) resp = requests.get(uri, timeout=timeout) return common.encode_strings(resp.json()) -def list_remote_segments(node, stream, variant, hour, timeout=TIMEOUT): +def list_remote_segments(node, channel, quality, hour, timeout=TIMEOUT): """Wrapper around a call to restreamer.list_segments.""" - uri = '{}/files/{}/{}/{}'.format(node, stream, variant, hour) + uri = '{}/files/{}/{}/{}'.format(node, channel, quality, hour) logging.debug('Getting list of segments from {}'.format(uri)) resp = requests.get(uri, timeout=timeout) return common.encode_strings(resp.json()) -def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, +def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment, timeout=TIMEOUT): """Get a segment from a node. - Fetches stream/variant/hour/missing_segment from node and puts it in - base_dir/stream/variant/hour/missing_segment. If the segment already exists + Fetches channel/quality/hour/missing_segment from node and puts it in + base_dir/channel/quality/hour/missing_segment. If the segment already exists locally, this does not attempt to fetch it.""" - path = os.path.join(base_dir, stream, variant, hour, missing_segment) + path = os.path.join(base_dir, channel, quality, hour, missing_segment) # check to see if file was created since we listed the local segments to # avoid unnecessarily copying if os.path.exists(path): @@ -90,8 +90,8 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, try: logging.debug('Fetching segment {} from {}'.format(path, node)) - uri = '{}/segments/{}/{}/{}/{}'.format(node, stream, variant, hour, missing_segment) - resp = requests.get(uri, stream=True, timeout=timeout) + uri = '{}/segments/{}/{}/{}/{}'.format(node, channel, quality, hour, missing_segment) + resp = requests.get(uri, channel=True, timeout=timeout) resp.raise_for_status() @@ -106,13 +106,13 @@ def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment, raise logging.debug('Saving completed segment {} as {}'.format(temp_path, path)) common.rename(temp_path, path) - segments_backfilled.labels(remote=node, stream=stream, variant=variant, hour=hour).inc() + segments_backfilled.labels(remote=node, channel=channel, quality=quality, hour=hour).inc() -def list_hours(node, stream, variants, start=None): +def list_hours(node, channel, qualities, start=None): """Return a list of all available hours from a node. - List all hours available from node/stream for each variant in variants + List all hours available from node/channel for each quality in qualities ordered from newest to oldest. Keyword arguments: @@ -120,7 +120,7 @@ def list_hours(node, stream, variants, start=None): return hours more recent than that number of hours ago. If None (default), all hours are returned.""" - hour_lists = [list_remote_hours(node, stream, variant) for variant in variants] + hour_lists = [list_remote_hours(node, channel, quality) for quality in qualities] hours = list(set().union(*hour_lists)) hours.sort(reverse=True) #latest hour first @@ -142,15 +142,15 @@ class BackfillerManager(object): NODE_INTERVAL = 300 #seconds between updating list of nodes - def __init__(self, base_dir, stream, variants, static_nodes=[], start=None, - run_once=False, node_file=None, node_database=None, localhost=None, - download_concurrency=5, recent_cutoff=120): + def __init__(self, base_dir, channel, qualities, static_nodes=[], + start=None, run_once=False, node_file=None, node_database=None, + localhost=None, download_concurrency=5, recent_cutoff=120): """Constructor for BackfillerManager. - Creates a manager for a given stream with specified variants. If - static_nodes is None, manager""" + + Creates a manager for a given channel with specified qualities.""" self.base_dir = base_dir - self.stream = stream - self.variants = variants + self.channel = channel + self.qualities = qualities self.static_nodes = static_nodes self.start = start self.run_once = run_once @@ -162,7 +162,7 @@ class BackfillerManager(object): self.download_concurrency = download_concurrency self.recent_cutoff = recent_cutoff self.stopping = gevent.event.Event() - self.logger = logging.getLogger("BackfillerManager({})".format(stream)) + self.logger = logging.getLogger("BackfillerManager({})".format(channel)) self.workers = {} # {node url: worker} def stop(self): @@ -276,8 +276,8 @@ class BackfillerManager(object): class BackfillerWorker(object): """Backfills segments from a node. - Backfills every WAIT_INTERVAL all segments from node/stream to - base_dir/stream for all variants. If run_once, only backfill once. + Backfills every WAIT_INTERVAL all segments from node/channel to + base_dir/channel for all qualities. If run_once, only backfill once. recent_cutoff -- Skip backfilling segments younger than this number of seconds to prioritise letting the downloader grab these segments. @@ -291,8 +291,8 @@ class BackfillerWorker(object): self.base_dir = manager.base_dir self.node = node self.download_concurrency = manager.download_concurrency - self.stream = manager.stream - self.variants = manager.variants + self.channel = manager.channel + self.qualities = manager.qualities self.start = manager.start self.run_once = manager.run_once self.recent_cutoff = manager.recent_cutoff @@ -300,7 +300,7 @@ class BackfillerWorker(object): self.done = gevent.event.Event() def __repr__(self): - return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.stream) + return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.channel) __str__ = __repr__ def stop(self): @@ -311,17 +311,17 @@ class BackfillerWorker(object): def backfill(self, hours): """Backfill from remote node. - Backfill from node/stream/variants to base_dir/stream/variants for each - hour in hours. + Backfill from node/channel/qualities to base_dir/channel/qualities for + each hour in hours. """ - for variant in self.variants: + for quality in self.qualities: for hour in hours: - self.logger.debug('Backfilling {}/{}'.format(variant, hour)) + self.logger.debug('Backfilling {}/{}'.format(quality, hour)) - local_segments = set(list_local_segments(self.base_dir, self.stream, variant, hour)) - remote_segments = set(list_remote_segments(self.node, self.stream, variant, hour)) + local_segments = set(list_local_segments(self.base_dir, self.channel, quality, hour)) + remote_segments = set(list_remote_segments(self.node, self.channel, quality, hour)) missing_segments = list(remote_segments - local_segments) # randomise the order of the segments to reduce the chance that @@ -336,7 +336,7 @@ class BackfillerWorker(object): if self.stopping.is_set(): return - path = os.path.join(self.stream, variant, hour, missing_segment) + path = os.path.join(self.channel, quality, hour, missing_segment) # test to see if file is a segment and get the segments start time try: @@ -359,7 +359,7 @@ class BackfillerWorker(object): # start segment as soon as a pool slot opens up, then track it in workers workers.append(pool.spawn( get_remote_segment, - self.base_dir, self.node, self.stream, variant, hour, missing_segment + self.base_dir, self.node, self.channel, quality, hour, missing_segment )) # verify that all the workers succeeded. if any failed, raise the exception from @@ -367,7 +367,7 @@ class BackfillerWorker(object): for worker in workers: worker.get() # re-raise error, if any - self.logger.info('{} segments in {}/{} backfilled'.format(len(workers), variant, hour)) + self.logger.info('{} segments in {}/{} backfilled'.format(len(workers), quality, hour)) def run(self): self.logger.info('Starting') @@ -376,7 +376,7 @@ class BackfillerWorker(object): while not self.stopping.is_set(): try: - self.backfill(list_hours(self.node, self.stream, self.variants, self.start)) + self.backfill(list_hours(self.node, self.channel, self.qualities, self.start)) failures = 0 #reset failure count on a successful backfill if not self.run_once: self.stopping.wait(common.jitter(self.WAIT_INTERVAL)) @@ -396,9 +396,9 @@ class BackfillerWorker(object): if self.node in self.manager.workers: del self.manager.workers[self.node] -@argh.arg("streams", nargs="*") +@argh.arg('channels', nargs='*', help='Channels to backfill from') @argh.arg('--base-dir', help='Directory to which segments will be backfilled. Default is current working directory.') -@argh.arg('--variants', help="Variants of each stream to backfill. Comma seperated if multiple. Default is 'source'.") +@argh.arg('--qualities', help="Qualities of each channel to backfill. Comma seperated if multiple. Default is 'source'.") @argh.arg('--metrics-port', help='Port for Prometheus stats. Default is 8002.') @argh.arg('--static-nodes', help='Nodes to always backfill from. Comma seperated if multiple. By default empty.') @argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.') @@ -409,14 +409,14 @@ class BackfillerWorker(object): @argh.arg('--localhost', help='Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname()') @argh.arg('--download-concurrency', help='Max number of concurrent segment downloads from a single node. Increasing this number may increase throughput but too high a value can overload the server or cause timeouts.') @argh.arg('--recent-cutoff', help='Minimum age for a segment before we will backfill it, to prevent us backfilling segments we could have just downloaded ourselves instead. Expressed as number of seconds.') -def main(streams, base_dir='.', variants='source', metrics_port=8002, +def main(channels, base_dir='.', qualities='source', metrics_port=8002, static_nodes='', backdoor_port=0, start=None, run_once=False, node_file=None, node_database=None, localhost=socket.gethostname(), download_concurrency=5, recent_cutoff=120): """Backfiller service.""" - variants = variants.split(',') if variants else [] - variants = [variant.strip() for variant in variants] + qualities = qualities.split(',') if qualities else [] + qualities = [quality.strip() for quality in qualities] static_nodes = static_nodes.split(',') if static_nodes else [] static_nodes = [static_node.strip() for static_node in static_nodes] @@ -434,9 +434,9 @@ def main(streams, base_dir='.', variants='source', metrics_port=8002, managers = [] workers = [] - for stream in streams: - logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir)) - manager = BackfillerManager(base_dir, stream, variants, static_nodes, start, run_once, node_file, node_database, localhost, download_concurrency, recent_cutoff) + for channel in channels: + logging.info('Starting backfilling {} with {} as qualities to {}'.format(channel, ', '.join(qualities), base_dir)) + manager = BackfillerManager(base_dir, channel, qualities, static_nodes, start, run_once, node_file, node_database, localhost, download_concurrency, recent_cutoff) managers.append(manager) workers.append(gevent.spawn(manager.run)) diff --git a/docker-compose.jsonnet b/docker-compose.jsonnet index 444ed04..5bfd9f5 100644 --- a/docker-compose.jsonnet +++ b/docker-compose.jsonnet @@ -134,7 +134,7 @@ command: [ $.channel, "--base-dir", "/mnt", - "--variants", std.join(",", $.qualities), + "--qualities", std.join(",", $.qualities), "--static-nodes", std.join(",", $.peers), "--backdoor-port", std.toString($.backdoor_port), ],