@ -15,6 +15,7 @@ import argh
import gevent
import gevent . backdoor
import gevent . event
import mysql . connector
import prometheus_client as prom
import requests
from monotonic import monotonic
@ -590,14 +591,15 @@ class SegmentGetter(object):
stat . set ( max ( stat . _value . get ( ) , timestamp ) ) # NOTE: not thread-safe but is gevent-safe
@argh.arg ( ' channels ' , nargs = " + " , help =
@argh.arg ( ' channels ' , nargs = " * " , help =
" Twitch channels to watch. Add a ' ! ' suffix to indicate they ' re expected to be always up. "
" This affects retry interval, error reporting and monitoring. "
)
@argh.arg ( ' --follow-game ' , action = ' append ' , default = [ ] , type = str ,
help = " Follow given game name and download all channels which stream that game " ,
@argh.arg ( ' --league-db ' , default = " necrobot-read:necrobot-read@condor.host/season_9 " , help =
" Connection string for database to check for racers and commentators. "
" Should be of form USER:PASSWORD@HOST/DATABASE. " ,
)
def main ( channels , base_dir = " . " , qualities = " source " , metrics_port = 8001 , backdoor_port = 0 , follow_game = None ) :
def main ( channels , base_dir = " . " , qualities = " source " , metrics_port = 8001 , backdoor_port = 0 , league_db = None ) :
qualities = qualities . split ( " , " ) if qualities else [ ]
managers = { }
@ -618,6 +620,14 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
if backdoor_port :
gevent . backdoor . BackdoorServer ( ( ' 127.0.0.1 ' , backdoor_port ) , locals = locals ( ) ) . start ( )
if league_db :
user , rest = league_db . split ( ' : ' , 1 )
password , rest = rest . split ( ' @ ' , 1 )
host , db = rest . split ( ' / ' , 1 )
conn = mysql . connector . connect ( host = host , user = user , password = password , database = db )
else :
conn = None
prev_check = 0
CHECK_INTERVAL = 60
while workers or not stopping . is_set ( ) :
@ -625,15 +635,32 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
for channel in managers . keys ( ) :
logging . info ( " Stopping channel {} for shutdown " . format ( channel ) )
managers . pop ( channel ) . stop ( )
elif time. time ( ) - prev_check > CHECK_INTERVAL :
logging . debug ( " Checking channels for games: {} " . format ( follow_game ) )
elif conn and time. time ( ) - prev_check > CHECK_INTERVAL :
logging . debug ( " Checking for new racers" )
prev_check = time . time ( )
try :
game_channels = list ( gevent . pool . Group ( ) . imap ( lambda g : list ( twitch . get_channels_for_game ( g ) ) , follow_game ) )
cur = conn . cursor ( )
# We're looking for all racers and commentators with an unfinished match
# that starts before 5min from now
cur . execute ( """
SELECT
racer_1_name ,
racer_2_name ,
cawmentator_name
FROM match_info
WHERE scheduled
AND scheduled_time < NOW ( ) + INTERVAL 5 MINUTE
AND NOT completed
""" )
channels_from_db = set ( )
for channels in cur . fetchall ( ) :
# filter out null commentators
channels = [ channel for channel in channels if channel ]
channels_from_db | = set ( channels )
except Exception :
logging . warning ( " Failed to fetch channels for games " , exc_info = True )
logging . warning ( " Failed to fetch c urrent rac es" , exc_info = True )
else :
new_channels = set ( channels ) . union ( * game_channels )
new_channels = set ( channels ) . union ( channels_from_db )
old_channels = set ( managers . keys ( ) )
for channel in new_channels - old_channels :
logging . info ( " Adding channel {} " . format ( channel ) )