diff --git a/buscribe/buscribe/main.py b/buscribe/buscribe/main.py index ac69b22..162f0ae 100644 --- a/buscribe/buscribe/main.py +++ b/buscribe/buscribe/main.py @@ -1,6 +1,6 @@ import logging import os -from datetime import timedelta, datetime +from datetime import timedelta, datetime, timezone from time import sleep import argh @@ -79,21 +79,50 @@ def main(channel, database="", base_dir=".", gevent.signal_handler(signal.SIGTERM, stop) + segments_end_time = None + while start_time < end_time: # If end time isn't given, use current time (plus fudge) to get a "live" segment list segments = common.get_best_segments(segments_dir, start_time, - end_time if end_time is not None else datetime.now() + timedelta(minutes=2)) - # Remove initial None segment if it exists + end_time if end_time is not None else + datetime.utcnow() + timedelta(minutes=2)) + + # Remove initial None segment (indicating segments start time is after desired start time) if it exists if segments[0] is None: segments = segments[1:] + # If there are no segments, we: + # - reached the live edge, or + # - reached a still-opening hole + # In both cases we can wait if not segments: - # We reached end of segments and are waiting for some new ones - sleep(30) + # If we have waited for more than 1min we flush the pipes to commit the last line of the stream, + # or commit the last line before a 1min hole. + if datetime.utcnow() - segments_end_time > timedelta(minutes=1): + finish_off_recognizer(recognizer, db_cursor) + logging.info("Waiting for new segments.") - continue + sleep(30) + + continue # Retry + # If there are new segments, but they form a hole (judged somewhat arbitrarily as 2s) relative to + # segments_end_time, but the hole started less than 1min ago + elif segments_end_time is not None and \ + segments[0].start - segments_end_time > timedelta(seconds=2) and \ + datetime.utcnow() - segments_end_time < timedelta(minutes=1): + + logging.info("Waiting for segments to be backfilled.") + sleep(30) + + continue # Retry + + # If we got new segments, but there was a hole that could not be backfilled + if segments_end_time is not None and \ + segments[0].start - segments_end_time > timedelta(seconds=2): + finish_off_recognizer(recognizer, db_cursor) + # Recognizer is fresh or was reset if recognizer.segments_start_time is None: recognizer.segments_start_time = segments[0].start logging.info(f"Starting from: {segments[0].start}") @@ -107,15 +136,5 @@ def main(channel, database="", base_dir=".", finish_off_recognizer(recognizer, db_cursor) db_conn.close() exit(0) - elif datetime.now() - segments_end_time > timedelta(minutes=5): - # Last seen segment ended more than five minutes ago. We hit a gap that will likely stay unfilled. - # Reset and jump to the other end of the gap. - finish_off_recognizer(recognizer, db_cursor) - else: - # End of live segment or a gap that is not old and might get filled. - # Give it a bit of time and continue. - # Note: if the gap is not filled within 30s, we jump to the next available segment. - sleep(30) - logging.info("Waiting for new segments.") start_time = segments_end_time