From 7b1c8c6d20eafffa111b840c395a8eb830b0e5e0 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 17 Oct 2023 18:25:01 +1100 Subject: [PATCH] optionally enabled debugging: Write out last 10 media playlists after fetching each segment --- downloader/downloader/main.py | 38 ++++++++++++++++++++++++------ downloader/downloader/providers.py | 3 ++- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 35b0239..512b68d 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -141,7 +141,7 @@ class StreamsManager(object): FETCH_TIMEOUTS = 5, 30 - def __init__(self, provider, channel, base_dir, qualities, important=False): + def __init__(self, provider, channel, base_dir, qualities, important=False, history_size=0): self.provider = provider self.channel = channel self.logger = logging.getLogger("StreamsManager({})".format(channel)) @@ -222,7 +222,7 @@ class StreamsManager(object): self.logger.info("Ignoring worker start as we are stopping") return url_time, url = self.latest_urls[quality] - worker = StreamWorker(self, quality, url, url_time) + worker = StreamWorker(self, quality, url, url_time, self.history_size) self.stream_workers[quality].append(worker) gevent.spawn(worker.run) @@ -314,7 +314,7 @@ class StreamWorker(object): # See https://github.com/dbvideostriketeam/wubloader/issues/539 MAX_SEGMENT_TIME_SKEW = 0.01 - def __init__(self, manager, quality, url, url_time): + def __init__(self, manager, quality, url, url_time, history_size): self.manager = manager self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(quality, id(self))) self.quality = quality @@ -336,6 +336,9 @@ class StreamWorker(object): # We expect the map data to be very small so there is no eviction here. # {uri: data} self.map_cache = {} + # If enabled, playlist history is saved after each segment fetch, + # showing the last N playlist fetches up until the one that resulted in that fetch. + self.history_size = history_size def __repr__(self): return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.quality) @@ -374,12 +377,14 @@ class StreamWorker(object): def _run(self): first = True suspicious_skew = False + history = [] while not self.stopping.is_set(): self.logger.debug("Getting media playlist {}".format(self.url)) try: with soft_hard_timeout(self.logger, "getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker): - playlist = self.manager.provider.get_media_playlist(self.url, session=self.session) + playlist_time = datetime.datetime.utcnow() + raw_playlist, playlist = self.manager.provider.get_media_playlist(self.url, session=self.session) except Exception as e: self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True) self.trigger_new_worker() @@ -395,6 +400,9 @@ class StreamWorker(object): # We successfully got the playlist at least once first = False + if self.history_size > 0: + history = [(playlist_time, raw_playlist)] + history[:self.history_size] + # Start any new segment getters date = None # tracks date in case some segment doesn't include it prev_segment = None @@ -441,6 +449,7 @@ class StreamWorker(object): date, suspicious_skew, self.map_cache, + history, ) gevent.spawn(self.getters[segment.uri].run) if date is not None: @@ -497,7 +506,7 @@ class SegmentGetter(object): # or so, to be paranoid we set it to considerably longer than that. GIVE_UP_TIMEOUT = 20 * 60 - def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, suspect, map_cache): + def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, suspect, map_cache, history): self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.base_dir = base_dir self.channel = channel @@ -511,6 +520,7 @@ class SegmentGetter(object): self.done = gevent.event.Event() # set when file exists or we give up # Our parent's connection pool, but we'll replace it if there's any issues self.session = session + self.history = history def run(self): try: @@ -670,6 +680,20 @@ class SegmentGetter(object): stat = latest_segment.labels(channel=self.channel, quality=self.quality) timestamp = (self.date - datetime.datetime(1970, 1, 1)).total_seconds() stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe + if self.history: + self.write_history(full_path) + + def write_history(self, segment_path): + segment_path = os.path.relpath(segment_path, self.base_dir) + history_path = os.path.join(self.base_dir, "playlist-debug", segment_path) + try: + os.makedirs(history_path) + except FileExistsError: + pass + for n, (timestamp, playlist) in enumerate(self.history): + filename = "{}_{}".format(n, timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f")) + path = os.path.join(history_path, filename) + common.atomic_write(path, playlist) def parse_channel(channel): @@ -688,7 +712,7 @@ def parse_channel(channel): "This affects retry interval, error reporting and monitoring. " "Non-twitch URLs can also be given with the form CHANNEL[!]:TYPE:URL" ) -def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, twitch_auth_file=None): +def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, twitch_auth_file=None, playlist_debug=0): qualities = qualities.split(",") if qualities else [] twitch_auth_token = None @@ -709,7 +733,7 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor channel_qualities = ["source"] else: raise ValueError(f"Unknown type {type!r}") - manager = StreamsManager(provider, channel, base_dir, channel_qualities, important=important) + manager = StreamsManager(provider, channel, base_dir, channel_qualities, important=important, history_size=playlist_debug) managers.append(manager) def stop(): diff --git a/downloader/downloader/providers.py b/downloader/downloader/providers.py index f791a33..996144c 100644 --- a/downloader/downloader/providers.py +++ b/downloader/downloader/providers.py @@ -29,7 +29,8 @@ class Provider: session = InstrumentedSession() resp = session.get(uri, metric_name='get_media_playlist') resp.raise_for_status() - return hls_playlist.load(resp.text, base_uri=resp.url) + playlist = resp.text + return playlist, hls_playlist.load(playlist, base_uri=resp.url) class URLProvider(Provider):