|
|
|
@ -401,6 +401,7 @@ class StreamWorker(object):
|
|
|
|
|
|
|
|
|
|
# Start any new segment getters
|
|
|
|
|
date = None # tracks date in case some segment doesn't include it
|
|
|
|
|
prev_segment = None
|
|
|
|
|
for segment in playlist.segments:
|
|
|
|
|
if segment.ad_reason:
|
|
|
|
|
self.logger.info("Ignoring ad segment: {}".format(segment.ad_reason))
|
|
|
|
@ -427,6 +428,7 @@ class StreamWorker(object):
|
|
|
|
|
segment_time_skew_non_zero_sum.labels(self.manager.channel, self.quality, f"{id(self):x}").inc(skew)
|
|
|
|
|
segment_time_skew_non_zero_count.labels(self.manager.channel, self.quality, f"{id(self):x}").inc()
|
|
|
|
|
if abs(skew) > self.MAX_SEGMENT_TIME_SKEW and not suspicious_skew:
|
|
|
|
|
self.logger.warning(f"Suspicious skew of {skew}, triggering new worker: Expected {date} after {prev_segment}, got {new_date} for {segment}")
|
|
|
|
|
self.trigger_new_worker()
|
|
|
|
|
suspicious_skew = True
|
|
|
|
|
suspicious_skew_count.labels(self.manager.channel, self.quality).inc()
|
|
|
|
@ -448,6 +450,7 @@ class StreamWorker(object):
|
|
|
|
|
gevent.spawn(self.getters[segment.uri].run)
|
|
|
|
|
if date is not None:
|
|
|
|
|
date += datetime.timedelta(seconds=segment.duration)
|
|
|
|
|
prev_segment = segment
|
|
|
|
|
|
|
|
|
|
# Clean up any old segment getters.
|
|
|
|
|
# Note use of list() to make a copy to avoid modification-during-iteration
|
|
|
|
|