Added the ability to delete old hours

pull/129/head
Christopher Usher 5 years ago
parent 28ef77b5a7
commit 9d81569d98

@ -6,6 +6,7 @@ import hashlib
import logging import logging
import os import os
import random import random
import shutil
import signal import signal
import socket import socket
import urlparse import urlparse
@ -55,6 +56,24 @@ HOUR_FMT = '%Y-%m-%dT%H'
TIMEOUT = 5 #default timeout in seconds for remote requests or exceptions TIMEOUT = 5 #default timeout in seconds for remote requests or exceptions
MAX_BACKOFF = 4 #number of times to back off MAX_BACKOFF = 4 #number of times to back off
def list_local_hours(base_dir, channel, quality):
"""List hours in a given quality directory.
For a given base_dir/channel/quality directory return a list of
non-hidden files. If the directory path is not found, return an empty list.
Based on based on restreamer.list_hours. We could just call
restreamer.list_hours but this avoids HTTP/JSON overheads."""
path = os.path.join(base_dir, channel, quality)
try:
return [name for name in os.listdir(path) if not name.startswith('.')]
except OSError as e:
if e.errno != errno.ENOENT:
raise
return []
def list_local_segments(base_dir, channel, quality, hour): def list_local_segments(base_dir, channel, quality, hour):
"""List segments in a given hour directory. """List segments in a given hour directory.
@ -176,8 +195,9 @@ class BackfillerManager(object):
NODE_INTERVAL = 300 #seconds between updating list of nodes NODE_INTERVAL = 300 #seconds between updating list of nodes
def __init__(self, base_dir, channel, qualities, static_nodes=[], def __init__(self, base_dir, channel, qualities, static_nodes=[],
start=None, run_once=False, node_file=None, node_database=None, start=None, delete_before=0, run_once=False, node_file=None,
localhost=None, download_concurrency=5, recent_cutoff=120): node_database=None, localhost=None, download_concurrency=5,
recent_cutoff=120):
"""Constructor for BackfillerManager. """Constructor for BackfillerManager.
Creates a manager for a given channel with specified qualities.""" Creates a manager for a given channel with specified qualities."""
@ -186,6 +206,7 @@ class BackfillerManager(object):
self.qualities = qualities self.qualities = qualities
self.static_nodes = static_nodes self.static_nodes = static_nodes
self.start = start self.start = start
self.delete_before = delete_before
self.run_once = run_once self.run_once = run_once
self.node_file = node_file self.node_file = node_file
self.db_manager = None if node_database is None else database.DBManager(dsn=node_database) self.db_manager = None if node_database is None else database.DBManager(dsn=node_database)
@ -218,6 +239,23 @@ class BackfillerManager(object):
"""Stop the worker for given node.""" """Stop the worker for given node."""
self.workers.pop(node).stop() self.workers.pop(node).stop()
def delete(self):
"""Delete old hours."""
self.logger.info('Deleting hours older than {} hours ago'.format(self.delete_before))
for quality in self.qualities:
hours = list_local_hours(self.base_dir, self.channel, quality)
cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=self.delete_before)
hours = [hour for hour in hours if hour < cutoff]
for hour in hours:
self.logger.info('Deleting {}/{}'.format(quality, hour))
shutil.rmtree(os.path.join(self.base_dir, self.channel, quality))
self.logger.info('Deleting old hours complete')
def run(self): def run(self):
"""Stop and start workers based on results of get_nodes. """Stop and start workers based on results of get_nodes.
@ -402,12 +440,12 @@ class BackfillerWorker(object):
self.logger.info('{} segments in {}/{} backfilled'.format(len(workers), quality, hour)) self.logger.info('{} segments in {}/{} backfilled'.format(len(workers), quality, hour))
hours_backfilled.labels(remote=self.node, channel=self.channel, quality=quality).inc() hours_backfilled.labels(remote=self.node, channel=self.channel, quality=quality).inc()
def run(self): def run(self):
self.logger.info('Starting') self.logger.info('Starting')
failures = 0 failures = 0
while not self.stopping.is_set(): while not self.stopping.is_set():
try: try:
self.logger.info('Starting backfill') self.logger.info('Starting backfill')
self.backfill(list_hours(self.node, self.channel, self.qualities, self.start)) self.backfill(list_hours(self.node, self.channel, self.qualities, self.start))
@ -439,6 +477,7 @@ class BackfillerWorker(object):
@argh.arg('--static-nodes', help='Nodes to always backfill from. Comma seperated if multiple. By default empty.') @argh.arg('--static-nodes', help='Nodes to always backfill from. Comma seperated if multiple. By default empty.')
@argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.') @argh.arg('--backdoor-port', help='Port for gevent.backdoor access. By default disabled.')
@argh.arg('--start', help='If a datetime only backfill hours after that datetime. If a number, bacfill hours more recent than that number of hours ago. If None (default), all hours are backfilled.') @argh.arg('--start', help='If a datetime only backfill hours after that datetime. If a number, bacfill hours more recent than that number of hours ago. If None (default), all hours are backfilled.')
@argh.arg('--delete-before', help='Delete hours older than this number of hours ago. If 0 (default) do not delete any hours.')
@argh.arg('--run-once', help='If True, backfill only once. By default False.') @argh.arg('--run-once', help='If True, backfill only once. By default False.')
@argh.arg('--node-file', help="Name of file listing nodes to backfill from. One node per line in the form NAME URI with whitespace only lines or lines starting with '#' ignored. If None (default) do not get nodes from a file.") @argh.arg('--node-file', help="Name of file listing nodes to backfill from. One node per line in the form NAME URI with whitespace only lines or lines starting with '#' ignored. If None (default) do not get nodes from a file.")
@argh.arg('--node-database', help='Postgres conection string for database to fetch a list of nodes from. Either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE . If None (default) do not get nodes from database.') @argh.arg('--node-database', help='Postgres conection string for database to fetch a list of nodes from. Either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE . If None (default) do not get nodes from database.')
@ -446,9 +485,9 @@ class BackfillerWorker(object):
@argh.arg('--download-concurrency', help='Max number of concurrent segment downloads from a single node. Increasing this number may increase throughput but too high a value can overload the server or cause timeouts.') @argh.arg('--download-concurrency', help='Max number of concurrent segment downloads from a single node. Increasing this number may increase throughput but too high a value can overload the server or cause timeouts.')
@argh.arg('--recent-cutoff', help='Minimum age for a segment before we will backfill it, to prevent us backfilling segments we could have just downloaded ourselves instead. Expressed as number of seconds.') @argh.arg('--recent-cutoff', help='Minimum age for a segment before we will backfill it, to prevent us backfilling segments we could have just downloaded ourselves instead. Expressed as number of seconds.')
def main(channels, base_dir='.', qualities='source', metrics_port=8002, def main(channels, base_dir='.', qualities='source', metrics_port=8002,
static_nodes='', backdoor_port=0, start=None, run_once=False, static_nodes='', backdoor_port=0, start=None, delete_before=0,
node_file=None, node_database=None, localhost=socket.gethostname(), run_once=False, node_file=None, node_database=None,
download_concurrency=5, recent_cutoff=120): localhost=socket.gethostname(), download_concurrency=5, recent_cutoff=120):
"""Backfiller service.""" """Backfiller service."""
qualities = qualities.split(',') if qualities else [] qualities = qualities.split(',') if qualities else []
@ -472,7 +511,9 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8002,
workers = [] workers = []
for channel in channels: for channel in channels:
logging.info('Starting backfilling {} with {} as qualities to {}'.format(channel, ', '.join(qualities), base_dir)) logging.info('Starting backfilling {} with {} as qualities to {}'.format(channel, ', '.join(qualities), base_dir))
manager = BackfillerManager(base_dir, channel, qualities, static_nodes, start, run_once, node_file, node_database, localhost, download_concurrency, recent_cutoff) manager = BackfillerManager(base_dir, channel, qualities, static_nodes,
start, delete_before, run_once, node_file, node_database,
localhost, download_concurrency, recent_cutoff)
managers.append(manager) managers.append(manager)
workers.append(gevent.spawn(manager.run)) workers.append(gevent.spawn(manager.run))

Loading…
Cancel
Save