|
|
@ -88,39 +88,25 @@ def main(channel, database="", base_dir=".",
|
|
|
|
end_time if end_time is not None else
|
|
|
|
end_time if end_time is not None else
|
|
|
|
datetime.utcnow() + timedelta(minutes=2))
|
|
|
|
datetime.utcnow() + timedelta(minutes=2))
|
|
|
|
|
|
|
|
|
|
|
|
# Remove initial None segment (indicating segments start time is after desired start time) if it exists
|
|
|
|
# If there is a hole at the start of the requested range because
|
|
|
|
if segments[0] is None:
|
|
|
|
if segments[0] is None:
|
|
|
|
segments = segments[1:]
|
|
|
|
# The hole is older than a minute, therefore
|
|
|
|
|
|
|
|
# - reset recognizer
|
|
|
|
# If there are no segments, we:
|
|
|
|
# - continue from existing segments
|
|
|
|
# - reached the live edge, or
|
|
|
|
if datetime.utcnow() - start_time > timedelta(minutes=1):
|
|
|
|
# - reached a still-opening hole
|
|
|
|
|
|
|
|
# In both cases we can wait
|
|
|
|
|
|
|
|
if not segments:
|
|
|
|
|
|
|
|
# If we have waited for more than 1min we flush the pipes to commit the last line of the stream,
|
|
|
|
|
|
|
|
# or commit the last line before a 1min hole.
|
|
|
|
|
|
|
|
if datetime.utcnow() - segments_end_time > timedelta(minutes=1):
|
|
|
|
|
|
|
|
finish_off_recognizer(recognizer, db_cursor)
|
|
|
|
finish_off_recognizer(recognizer, db_cursor)
|
|
|
|
|
|
|
|
|
|
|
|
logging.info("Waiting for new segments.")
|
|
|
|
# If the hole is less than a minute old, or if we don't have new segments: wait for segments
|
|
|
|
|
|
|
|
if datetime.utcnow() - start_time <= timedelta(minutes=1) or \
|
|
|
|
|
|
|
|
len(segments) < 2:
|
|
|
|
|
|
|
|
logging.info("Waiting for new or backfilled segments.")
|
|
|
|
sleep(30)
|
|
|
|
sleep(30)
|
|
|
|
|
|
|
|
|
|
|
|
continue # Retry
|
|
|
|
continue # Retry
|
|
|
|
# If there are new segments, but they form a hole (judged somewhat arbitrarily as 2s) relative to
|
|
|
|
|
|
|
|
# segments_end_time, but the hole started less than 1min ago
|
|
|
|
|
|
|
|
elif segments_end_time is not None and \
|
|
|
|
|
|
|
|
segments[0].start - segments_end_time > timedelta(seconds=2) and \
|
|
|
|
|
|
|
|
datetime.utcnow() - segments_end_time < timedelta(minutes=1):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.info("Waiting for segments to be backfilled.")
|
|
|
|
# Remove initial None segment (indicating segments start time is after desired start time) if it exists
|
|
|
|
sleep(30)
|
|
|
|
if segments[0] is None:
|
|
|
|
|
|
|
|
segments = segments[1:]
|
|
|
|
continue # Retry
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# If we got new segments, but there was a hole that could not be backfilled
|
|
|
|
|
|
|
|
if segments_end_time is not None and \
|
|
|
|
|
|
|
|
segments[0].start - segments_end_time > timedelta(seconds=2):
|
|
|
|
|
|
|
|
finish_off_recognizer(recognizer, db_cursor)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Recognizer is fresh or was reset
|
|
|
|
# Recognizer is fresh or was reset
|
|
|
|
if recognizer.segments_start_time is None:
|
|
|
|
if recognizer.segments_start_time is None:
|
|
|
|