|
|
|
@ -624,9 +624,6 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
|
|
|
|
|
user, rest = league_db.split(':', 1)
|
|
|
|
|
password, rest = rest.split('@', 1)
|
|
|
|
|
host, db = rest.split('/', 1)
|
|
|
|
|
conn = mysql.connector.connect(host=host, user=user, password=password, database=db)
|
|
|
|
|
else:
|
|
|
|
|
conn = None
|
|
|
|
|
|
|
|
|
|
prev_check = 0
|
|
|
|
|
CHECK_INTERVAL = 60
|
|
|
|
@ -635,10 +632,11 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
|
|
|
|
|
for channel in managers.keys():
|
|
|
|
|
logging.info("Stopping channel {} for shutdown".format(channel))
|
|
|
|
|
managers.pop(channel).stop()
|
|
|
|
|
elif conn and time.time() - prev_check > CHECK_INTERVAL:
|
|
|
|
|
logging.debug("Checking for new racers")
|
|
|
|
|
elif league_db and time.time() - prev_check > CHECK_INTERVAL:
|
|
|
|
|
logging.info("Checking for new racers")
|
|
|
|
|
prev_check = time.time()
|
|
|
|
|
try:
|
|
|
|
|
conn = mysql.connector.connect(host=host, user=user, password=password, database=db)
|
|
|
|
|
cur = conn.cursor()
|
|
|
|
|
# We're looking for all racers and commentators with an unfinished match
|
|
|
|
|
# that starts before 5min from now
|
|
|
|
@ -653,22 +651,33 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
|
|
|
|
|
AND NOT completed
|
|
|
|
|
""")
|
|
|
|
|
channels_from_db = set()
|
|
|
|
|
for channels in cur.fetchall():
|
|
|
|
|
for row_channels in cur.fetchall():
|
|
|
|
|
logging.info("Got row: {}".format(row_channels))
|
|
|
|
|
# filter out null commentators
|
|
|
|
|
channels = [channel for channel in channels if channel]
|
|
|
|
|
channels_from_db |= set(channels)
|
|
|
|
|
row_channels = [channel.lower() for channel in row_channels if channel]
|
|
|
|
|
channels_from_db |= set(row_channels)
|
|
|
|
|
except Exception:
|
|
|
|
|
logging.warning("Failed to fetch current races", exc_info=True)
|
|
|
|
|
else:
|
|
|
|
|
logging.info("Current races: {}".format(", ".join(list(channels_from_db))))
|
|
|
|
|
new_channels = set(channels).union(channels_from_db)
|
|
|
|
|
old_channels = set(managers.keys())
|
|
|
|
|
for channel in new_channels - old_channels:
|
|
|
|
|
logging.info("Adding channel {}".format(channel))
|
|
|
|
|
managers[channel] = StreamsManager(channel.rstrip('!'), base_dir, qualities, important=channel.endswith('!'))
|
|
|
|
|
managers[channel] = StreamsManager(
|
|
|
|
|
channel.rstrip('!'), base_dir, qualities,
|
|
|
|
|
important=channel.endswith('!') or channel in channels_from_db,
|
|
|
|
|
)
|
|
|
|
|
workers[channel] = gevent.spawn(managers[channel].run)
|
|
|
|
|
for channel in old_channels - new_channels:
|
|
|
|
|
logging.info("Stopping channel {}".format(channel))
|
|
|
|
|
managers.pop(channel).stop()
|
|
|
|
|
finally:
|
|
|
|
|
try:
|
|
|
|
|
cur.close()
|
|
|
|
|
conn.close()
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
for channel, worker in workers.items():
|
|
|
|
|
if not worker.ready():
|
|
|
|
|
continue
|
|
|
|
@ -683,6 +692,7 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
|
|
|
|
|
|
|
|
|
|
# Wait for next check, or for any worker to finish
|
|
|
|
|
timeout = max(0, prev_check + CHECK_INTERVAL - time.time())
|
|
|
|
|
logging.info("Waiting for {:.2f}s for any worker to finish".format(timeout))
|
|
|
|
|
gevent.wait([stopping] + workers.values(), count=1, timeout=timeout)
|
|
|
|
|
|
|
|
|
|
logging.info("Gracefully stopped")
|
|
|
|
|