@ -6,7 +6,6 @@ import hashlib
import logging
import os
import random
import shutil
import signal
import socket
import urlparse
@ -161,6 +160,7 @@ def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment,
segments_backfilled . labels ( remote = node , channel = channel , quality = quality , hour = hour ) . inc ( )
logger . info ( ' Segment {} / {} / {} backfilled ' . format ( quality , hour , missing_segment ) )
def list_hours ( node , channel , qualities , start = None ) :
""" Return a list of all available hours from a node.
@ -190,12 +190,14 @@ class BackfillerManager(object):
The manager regularly calls get_nodes to an up to date list of nodes . If no
worker exists for a node in this list or in the static_node list , the
manager starts one . If a worker corresponds to a node not in either list ,
the manager stops it . If run_once , only backfill once . """
the manager stops it . If run_once , only backfill once . If delete_old ,
delete hours older than start . The deletion is handled by the Manager as
having the Workers do it could lead to race conditions . """
NODE_INTERVAL = 300 #seconds between updating list of nodes
def __init__ ( self , base_dir , channel , qualities , static_nodes = [ ] ,
start = None , keep_hours= 0 , run_once = False , node_file = None ,
start = None , delete_old= False , run_once = False , node_file = None ,
node_database = None , localhost = None , download_concurrency = 5 ,
recent_cutoff = 120 ) :
""" Constructor for BackfillerManager.
@ -206,7 +208,7 @@ class BackfillerManager(object):
self . qualities = qualities
self . static_nodes = static_nodes
self . start = start
self . keep_hours = keep_hours
self . delete_old = delete_old
self . run_once = run_once
self . node_file = node_file
self . db_manager = None if node_database is None else database . DBManager ( dsn = node_database )
@ -239,20 +241,50 @@ class BackfillerManager(object):
""" Stop the worker for given node. """
self . workers . pop ( node ) . stop ( )
def delete ( self ) :
""" Delete hours older than self. keep_hours ago."""
def delete _hours ( self ) :
""" Delete hours older than self. start ago."""
self . logger . info ( ' Deleting hours older than {} hours ago ' . format ( self . keep_hours ) )
if isinstance ( self . start , datetime . datetime ) :
self . logger . info ( ' Deleting hours older than {} ' . format ( self . start . strftime ( HOUR_FMT ) ) )
else :
self . logger . info ( ' Deleting hours older than {} hours ago ' . format ( self . start ) )
for quality in self . qualities :
hours = list_local_hours ( self . base_dir , self . channel , quality )
cutoff = datetime . datetime . utcnow ( ) - datetime . timedelta ( hours = self . keep_hours )
hours = [ hour for hour in hours if hour < cutoff ]
if not isinstance ( self . start , datetime . datetime ) :
cutoff = datetime . datetime . utcnow ( ) - datetime . timedelta ( hours = self . start )
else :
cutoff = self . start
hours = [ hour for hour in hours if datetime . datetime . strptime ( hour , HOUR_FMT ) < cutoff ]
hours . sort ( )
for hour in hours :
self . logger . info ( ' Deleting {} / {} ' . format ( quality , hour ) )
shutil . rmtree ( os . path . join ( self . base_dir , self . channel , quality ) )
path = os . path . join ( self . base_dir , self . channel , quality , hour )
self . logger . info ( ' Deleting {} ' . format ( path ) )
segments = list_local_segments ( self . base_dir , self . channel , quality , hour )
for segment in segments :
try :
os . remove ( os . path . join ( path , segment ) )
except OSError as e :
# ignore error when the file is already gone
if e . errno != errno . ENOENT :
raise
try :
os . rmdir ( path )
except OSError as e :
# ignore error when file is already deleted
if e . errno == errno . ENOENT :
self . logger . warn ( ' {} already deleted ' . format ( path ) )
# warn if not empty (will try to delete folder again next time)
elif e . errno == errno . ENOTEMPTY :
self . logger . warn ( ' Failed to delete non-empty folder {} ' . format ( path ) )
else :
raise e
else :
self . logger . info ( ' {} deleted ' . format ( path ) )
self . logger . info ( ' Deleting old hours complete ' )
@ -291,6 +323,9 @@ class BackfillerManager(object):
if self . run_once :
break
if self . delete_old and self . start :
self . delete_hours ( )
self . stopping . wait ( common . jitter ( self . NODE_INTERVAL ) )
#wait for all workers to finish
@ -387,6 +422,14 @@ class BackfillerWorker(object):
for quality in self . qualities :
for hour in hours :
# since backfilling can take a long time, recheck whether this
# hour is after the start
if not isinstance ( self . start , datetime . datetime ) :
start_hour = datetime . datetime . utcnow ( ) - datetime . timedelta ( hours = self . start )
else :
start_hour = self . start
if datetime . datetime . strptime ( hour , HOUR_FMT ) < start_hour :
break
self . logger . info ( ' Backfilling {} / {} ' . format ( quality , hour ) )
@ -477,7 +520,7 @@ class BackfillerWorker(object):
@argh.arg ( ' --static-nodes ' , help = ' Nodes to always backfill from. Comma seperated if multiple. By default empty. ' )
@argh.arg ( ' --backdoor-port ' , help = ' Port for gevent.backdoor access. By default disabled. ' )
@argh.arg ( ' --start ' , help = ' If a datetime only backfill hours after that datetime. If a number, bacfill hours more recent than that number of hours ago. If None (default), all hours are backfilled. ' )
@argh.arg ( ' -- keep-hours' , help = ' Only keep this number of hours. If 0 (default) keep all hours .' )
@argh.arg ( ' -- delete-old' , help = ' If True, delete hours older than start. By default False .' )
@argh.arg ( ' --run-once ' , help = ' If True, backfill only once. By default False. ' )
@argh.arg ( ' --node-file ' , help = " Name of file listing nodes to backfill from. One node per line in the form NAME URI with whitespace only lines or lines starting with ' # ' ignored. If None (default) do not get nodes from a file. " )
@argh.arg ( ' --node-database ' , help = ' Postgres conection string for database to fetch a list of nodes from. Either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE . If None (default) do not get nodes from database. ' )
@ -485,7 +528,7 @@ class BackfillerWorker(object):
@argh.arg ( ' --download-concurrency ' , help = ' Max number of concurrent segment downloads from a single node. Increasing this number may increase throughput but too high a value can overload the server or cause timeouts. ' )
@argh.arg ( ' --recent-cutoff ' , help = ' Minimum age for a segment before we will backfill it, to prevent us backfilling segments we could have just downloaded ourselves instead. Expressed as number of seconds. ' )
def main ( channels , base_dir = ' . ' , qualities = ' source ' , metrics_port = 8002 ,
static_nodes = ' ' , backdoor_port = 0 , start = None , keep_hours= 0 ,
static_nodes = ' ' , backdoor_port = 0 , start = None , delete_old= False ,
run_once = False , node_file = None , node_database = None ,
localhost = socket . gethostname ( ) , download_concurrency = 5 , recent_cutoff = 120 ) :
""" Backfiller service. """
@ -499,13 +542,9 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8002,
try :
start = float ( start )
logging . info ( ' Backfilling last {} hours ' . format ( start ) )
if keep_hours and start > keep_hours :
logging . warn ( ' Keeping fewer hours ( {} ) than backfilling ( {} ) ' . format ( keep_hours , start ) )
except ValueError :
start = dateutil . parse ( start )
logging . info ( ' Backfilling since {} ' . format ( start ) )
if keep_hours :
logging . warn ( ' Only keeping {} hours when backfilling since {} ' . format ( keep_hours , start ) )
common . PromLogCountsHandler . install ( )
common . install_stacksampler ( )
@ -516,7 +555,7 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8002,
for channel in channels :
logging . info ( ' Starting backfilling {} with {} as qualities to {} ' . format ( channel , ' , ' . join ( qualities ) , base_dir ) )
manager = BackfillerManager ( base_dir , channel , qualities , static_nodes ,
start , keep_hours , run_once , node_file , node_database ,
start , delete_old , run_once , node_file , node_database ,
localhost , download_concurrency , recent_cutoff )
managers . append ( manager )
workers . append ( gevent . spawn ( manager . run ) )