time-related unbusening

pull/414/head
HeNine 3 years ago
parent a1a0a47a72
commit 022c271e07

@ -1,6 +1,6 @@
import logging import logging
import os import os
from datetime import timedelta, datetime from datetime import timedelta, datetime, timezone
from time import sleep from time import sleep
import argh import argh
@ -79,21 +79,50 @@ def main(channel, database="", base_dir=".",
gevent.signal_handler(signal.SIGTERM, stop) gevent.signal_handler(signal.SIGTERM, stop)
segments_end_time = None
while start_time < end_time: while start_time < end_time:
# If end time isn't given, use current time (plus fudge) to get a "live" segment list # If end time isn't given, use current time (plus fudge) to get a "live" segment list
segments = common.get_best_segments(segments_dir, segments = common.get_best_segments(segments_dir,
start_time, start_time,
end_time if end_time is not None else datetime.now() + timedelta(minutes=2)) end_time if end_time is not None else
# Remove initial None segment if it exists datetime.utcnow() + timedelta(minutes=2))
# Remove initial None segment (indicating segments start time is after desired start time) if it exists
if segments[0] is None: if segments[0] is None:
segments = segments[1:] segments = segments[1:]
# If there are no segments, we:
# - reached the live edge, or
# - reached a still-opening hole
# In both cases we can wait
if not segments: if not segments:
# We reached end of segments and are waiting for some new ones # If we have waited for more than 1min we flush the pipes to commit the last line of the stream,
sleep(30) # or commit the last line before a 1min hole.
if datetime.utcnow() - segments_end_time > timedelta(minutes=1):
finish_off_recognizer(recognizer, db_cursor)
logging.info("Waiting for new segments.") logging.info("Waiting for new segments.")
continue sleep(30)
continue # Retry
# If there are new segments, but they form a hole (judged somewhat arbitrarily as 2s) relative to
# segments_end_time, but the hole started less than 1min ago
elif segments_end_time is not None and \
segments[0].start - segments_end_time > timedelta(seconds=2) and \
datetime.utcnow() - segments_end_time < timedelta(minutes=1):
logging.info("Waiting for segments to be backfilled.")
sleep(30)
continue # Retry
# If we got new segments, but there was a hole that could not be backfilled
if segments_end_time is not None and \
segments[0].start - segments_end_time > timedelta(seconds=2):
finish_off_recognizer(recognizer, db_cursor)
# Recognizer is fresh or was reset
if recognizer.segments_start_time is None: if recognizer.segments_start_time is None:
recognizer.segments_start_time = segments[0].start recognizer.segments_start_time = segments[0].start
logging.info(f"Starting from: {segments[0].start}") logging.info(f"Starting from: {segments[0].start}")
@ -107,15 +136,5 @@ def main(channel, database="", base_dir=".",
finish_off_recognizer(recognizer, db_cursor) finish_off_recognizer(recognizer, db_cursor)
db_conn.close() db_conn.close()
exit(0) exit(0)
elif datetime.now() - segments_end_time > timedelta(minutes=5):
# Last seen segment ended more than five minutes ago. We hit a gap that will likely stay unfilled.
# Reset and jump to the other end of the gap.
finish_off_recognizer(recognizer, db_cursor)
else:
# End of live segment or a gap that is not old and might get filled.
# Give it a bit of time and continue.
# Note: if the gap is not filled within 30s, we jump to the next available segment.
sleep(30)
logging.info("Waiting for new segments.")
start_time = segments_end_time start_time = segments_end_time

Loading…
Cancel
Save