@ -15,6 +15,7 @@ import prometheus_client as prom
import common
import common
from common import dateutil
from common import dateutil
from common import database
segment_count_gauge = prom . Gauge (
segment_count_gauge = prom . Gauge (
@ -73,7 +74,8 @@ class CoverageChecker(object):
CHECK_INTERVAL = 60 #seconds between checking coverage
CHECK_INTERVAL = 60 #seconds between checking coverage
def __init__ ( self , channel , qualities , base_dir , first_hour , last_hour ) :
def __init__ ( self , channel , qualities , base_dir , first_hour , last_hour ,
make_page , connection_string ) :
""" Constructor for CoverageChecker.
""" Constructor for CoverageChecker.
Creates a checker for a given channel with specified qualities . """
Creates a checker for a given channel with specified qualities . """
@ -83,6 +85,8 @@ class CoverageChecker(object):
self . qualities = qualities
self . qualities = qualities
self . first_hour = first_hour
self . first_hour = first_hour
self . last_hour = last_hour
self . last_hour = last_hour
self . make_page = make_page
self . db_manager = None if connection_string is None else database . DBManager ( dsn = connection_string )
self . stopping = gevent . event . Event ( )
self . stopping = gevent . event . Event ( )
self . logger = logging . getLogger ( ' CoverageChecker( {} ) ' . format ( channel ) )
self . logger = logging . getLogger ( ' CoverageChecker( {} ) ' . format ( channel ) )
@ -176,6 +180,63 @@ class CoverageChecker(object):
os . rename ( temp_path , final_path )
os . rename ( temp_path , final_path )
self . logger . info ( ' Coverage map for {} created ' . format ( quality ) )
self . logger . info ( ' Coverage map for {} created ' . format ( quality ) )
def create_coverage_page ( self , quality ) :
nodes = { }
try :
connection = self . db_manager . get_conn ( )
host = [ s . split ( ' = ' ) [ - 1 ] for s in connection . dsn . split ( ) if ' host ' in s ] [ 0 ]
self . logger . info ( ' Fetching list of nodes from {} ' . format ( host ) )
results = database . query ( connection , """
SELECT name , url
FROM nodes
WHERE backfill_from """ )
for row in results :
nodes [ row . name ] = row . url
except :
self . logger . exception ( ' Getting nodes failed. ' , exc_info = True )
return
self . logger . info ( ' Nodes fetched: {} ' . format ( nodes . keys ( ) ) )
html = """ <!DOCTYPE html>
< html >
< head >
< meta charset = " utf-8 " >
< meta http - equiv = " refresh " content = " 30 " / >
< title > { 0 } { 1 } Segment Coverage Maps < / title >
< style >
html { { background - color : #222;}}
h1 { { color : #eee;
text - align : center ;
font - family : sans - serif ; } }
h3 { { color : #eee;
text - align : center ;
font - family : sans - serif ; } }
img { { display : block ;
margin - left : auto ;
margin - right : auto ; } }
< / style >
< / head >
< body >
< h1 > { 0 } { 1 } < / h1 > """ .format(self.channel, quality)
for node in sorted ( nodes . keys ( ) ) :
html + = """ <h3> {} </h3>
< img src = " {} /segments/coverage-maps/ {} _ {} _coverage.png " alt = " {} " >
""" .format(node, nodes[node], self.channel, quality, node)
html + = """ </body>
< / html > """
path_prefix = os . path . join ( self . base_dir , ' coverage-maps ' , ' {} _ {} ' . format ( self . channel , quality ) )
temp_path = ' {} _ {} .html ' . format ( path_prefix , uuid . uuid4 ( ) )
final_path = ' {} _coverage.html ' . format ( path_prefix )
common . ensure_directory ( temp_path )
with open ( temp_path , ' w ' ) as f :
f . write ( html )
os . rename ( temp_path , final_path )
self . logger . info ( ' Coverage page for {} created ' . format ( quality ) )
def run ( self ) :
def run ( self ) :
""" Loop over available hours for each quality, checking segment coverage. """
""" Loop over available hours for each quality, checking segment coverage. """
@ -422,6 +483,8 @@ class CoverageChecker(object):
self . logger . info ( ' {} / {} is empty ' . format ( quality , hour ) )
self . logger . info ( ' {} / {} is empty ' . format ( quality , hour ) )
self . create_coverage_map ( quality , all_hour_holes , all_hour_partials )
self . create_coverage_map ( quality , all_hour_holes , all_hour_partials )
if self . make_page :
self . create_coverage_page ( quality )
self . stopping . wait ( common . jitter ( self . CHECK_INTERVAL ) )
self . stopping . wait ( common . jitter ( self . CHECK_INTERVAL ) )
@ -431,10 +494,13 @@ class CoverageChecker(object):
@argh.arg ( ' --qualities ' , help = " Qualities of each channel to checked. Comma seperated if multiple. Default is ' source ' . " )
@argh.arg ( ' --qualities ' , help = " Qualities of each channel to checked. Comma seperated if multiple. Default is ' source ' . " )
@argh.arg ( ' --first-hour ' , help = ' First hour to compute coverage for. Default is earliest available hour. ' )
@argh.arg ( ' --first-hour ' , help = ' First hour to compute coverage for. Default is earliest available hour. ' )
@argh.arg ( ' --last-hour ' , help = ' Last hour to compute coverage for. Default is lastest available hour. ' )
@argh.arg ( ' --last-hour ' , help = ' Last hour to compute coverage for. Default is lastest available hour. ' )
@argh.arg ( ' --make-page ' , help = ' Make a html page displaying coverage maps for all nodes in database ' )
@argh.arg ( ' --connection-string ' , help = ' Postgres connection string, which is either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE ' )
@argh.arg ( ' --metrics-port ' , help = ' Port for Prometheus stats. Default is 8006. ' )
@argh.arg ( ' --metrics-port ' , help = ' Port for Prometheus stats. Default is 8006. ' )
@argh.arg ( ' --backdoor-port ' , help = ' Port for gevent.backdoor access. By default disabled. ' )
@argh.arg ( ' --backdoor-port ' , help = ' Port for gevent.backdoor access. By default disabled. ' )
def main ( channels , base_dir = ' . ' , qualities = ' source ' , first_hour = None ,
def main ( channels , base_dir = ' . ' , qualities = ' source ' , first_hour = None ,
last_hour = None , metrics_port = 8006 , backdoor_port = 0 ) :
last_hour = None , make_page = False , connection_string = None ,
metrics_port = 8006 , backdoor_port = 0 ) :
""" Segment coverage service """
""" Segment coverage service """
qualities = qualities . split ( ' , ' ) if qualities else [ ]
qualities = qualities . split ( ' , ' ) if qualities else [ ]
@ -452,7 +518,8 @@ def main(channels, base_dir='.', qualities='source', first_hour=None,
workers = [ ]
workers = [ ]
for channel in channels :
for channel in channels :
logging . info ( ' Starting coverage checks {} with {} as qualities in {} ' . format ( channel , ' , ' . join ( qualities ) , base_dir ) )
logging . info ( ' Starting coverage checks {} with {} as qualities in {} ' . format ( channel , ' , ' . join ( qualities ) , base_dir ) )
manager = CoverageChecker ( channel , qualities , base_dir , first_hour , last_hour )
manager = CoverageChecker ( channel , qualities , base_dir , first_hour ,
last_hour , make_page , connection_string )
managers . append ( manager )
managers . append ( manager )
workers . append ( gevent . spawn ( manager . run ) )
workers . append ( gevent . spawn ( manager . run ) )
@ -476,7 +543,7 @@ def main(channels, base_dir='.', qualities='source', first_hour=None,
# 2. Wait (with timeout) until they've stopped
# 2. Wait (with timeout) until they've stopped
gevent . wait ( workers )
gevent . wait ( workers )
# 3. Check if any of them failed. If they did, report it. If mulitple
# 3. Check if any of them failed. If they did, report it. If mulitple
# failed, we report one arbitrarily.
# failed, we report one arbitrarily.
for worker in workers :
for worker in workers :
worker . get ( )
worker . get ( )