From a1a6e4a020db274955e74cc63275f18e22312857 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 17 Oct 2023 18:25:01 +1100 Subject: [PATCH] Temporary debugging: Write out last 10 media playlists after fetching each segment --- downloader/downloader/main.py | 21 +++++++++++++++++++-- downloader/downloader/providers.py | 3 ++- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 3b9ae49..322b84c 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -347,12 +347,15 @@ class StreamWorker(object): def _run(self): first = True + history = [] + HISTORY_SIZE = 10 while not self.stopping.is_set(): self.logger.debug("Getting media playlist {}".format(self.url)) try: with soft_hard_timeout(self.logger, "getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker): - playlist = self.manager.provider.get_media_playlist(self.url, session=self.session) + playlist_time = datetime.datetime.utcnow() + raw_playlist, playlist = self.manager.provider.get_media_playlist(self.url, session=self.session) except Exception as e: self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True) self.trigger_new_worker() @@ -368,6 +371,8 @@ class StreamWorker(object): # We successfully got the playlist at least once first = False + history = [(playlist_time, raw_playlist)] + history[:HISTORY_SIZE] + # Start any new segment getters date = None # tracks date in case some segment doesn't include it for segment in playlist.segments: @@ -393,6 +398,7 @@ class StreamWorker(object): segment, date, self.map_cache, + history, ) gevent.spawn(self.getters[segment.uri].run) if date is not None: @@ -448,7 +454,7 @@ class SegmentGetter(object): # or so, to be paranoid we set it to considerably longer than that. GIVE_UP_TIMEOUT = 20 * 60 - def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, map_cache): + def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, map_cache, history): self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.base_dir = base_dir self.channel = channel @@ -461,6 +467,7 @@ class SegmentGetter(object): self.done = gevent.event.Event() # set when file exists or we give up # Our parent's connection pool, but we'll replace it if there's any issues self.session = session + self.history = history def run(self): try: @@ -612,6 +619,16 @@ class SegmentGetter(object): stat = latest_segment.labels(channel=self.channel, quality=self.quality) timestamp = (self.date - datetime.datetime(1970, 1, 1)).total_seconds() stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe + self.write_history(full_path) + + def write_history(self, segment_path): + segment_path = os.path.relpath(segment_path, self.base_dir) + history_path = os.path.join(self.base_dir, "playlist-debug", segment_path) + os.makedirs(history_path) + for n, (timestamp, playlist) in enumerate(self.history): + filename = "{}_{}".format(n, timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f")) + path = os.path.join(history_path, filename) + common.atomic_write(path, playlist) def parse_channel(channel): diff --git a/downloader/downloader/providers.py b/downloader/downloader/providers.py index c3c2a02..af51263 100644 --- a/downloader/downloader/providers.py +++ b/downloader/downloader/providers.py @@ -29,7 +29,8 @@ class Provider: session = InstrumentedSession() resp = session.get(uri, metric_name='get_media_playlist') resp.raise_for_status() - return hls_playlist.load(resp.text, base_uri=resp.url) + playlist = resp.text + return playlist, hls_playlist.load(playlist, base_uri=resp.url) class URLProvider(Provider):