py3 fixes for downloader

pull/224/head
Mike Lang 3 years ago committed by Mike Lang
parent 50231a5154
commit 6a98addac8

@ -5,7 +5,6 @@ import hashlib
import logging import logging
import os import os
import signal import signal
import sys
import uuid import uuid
from base64 import b64encode from base64 import b64encode
from contextlib import contextmanager from contextlib import contextmanager
@ -55,7 +54,7 @@ class TimedOutError(Exception):
@contextmanager @contextmanager
def soft_hard_timeout(logger, description, (soft_timeout, hard_timeout), on_soft_timeout): def soft_hard_timeout(logger, description, timeouts, on_soft_timeout):
"""Context manager that wraps a piece of code in a pair of timeouts, """Context manager that wraps a piece of code in a pair of timeouts,
a "soft" timeout and a "hard" one. If the block does not complete before a "soft" timeout and a "hard" one. If the block does not complete before
the soft timeout, the given on_soft_timeout() function is called in a new greenlet. the soft timeout, the given on_soft_timeout() function is called in a new greenlet.
@ -70,6 +69,7 @@ def soft_hard_timeout(logger, description, (soft_timeout, hard_timeout), on_soft
retrying, even as the original code continues to hold out in hope the call eventually retrying, even as the original code continues to hold out in hope the call eventually
succeeds. succeeds.
""" """
soft_timeout, hard_timeout = timeouts
# Finished is set after we're done to flag to the pending soft timeout callback # Finished is set after we're done to flag to the pending soft timeout callback
# that it shouldn't run. # that it shouldn't run.
finished = False finished = False
@ -214,7 +214,7 @@ class StreamsManager(object):
fetch_time = monotonic() fetch_time = monotonic()
with soft_hard_timeout(self.logger, "fetching master playlist", self.FETCH_TIMEOUTS, self.trigger_refresh): with soft_hard_timeout(self.logger, "fetching master playlist", self.FETCH_TIMEOUTS, self.trigger_refresh):
master_playlist = twitch.get_master_playlist(self.channel) master_playlist = twitch.get_master_playlist(self.channel)
new_urls = twitch.get_media_playlist_uris(master_playlist, self.stream_workers.keys()) new_urls = twitch.get_media_playlist_uris(master_playlist, list(self.stream_workers.keys()))
self.update_urls(fetch_time, new_urls) self.update_urls(fetch_time, new_urls)
for quality, workers in self.stream_workers.items(): for quality, workers in self.stream_workers.items():
# warn and retry if the url is missing # warn and retry if the url is missing
@ -260,9 +260,11 @@ class StreamsManager(object):
# wait min retry interval with jitter, unless we're stopping # wait min retry interval with jitter, unless we're stopping
self.stopping.wait(common.jitter(self.FETCH_MIN_INTERVAL)) self.stopping.wait(common.jitter(self.FETCH_MIN_INTERVAL))
self.logger.info("Stopping workers") self.logger.info("Stopping workers")
for workers in self.stream_workers.values(): stream_workers = list(self.stream_workers.values())
for workers in stream_workers:
for worker in workers: for worker in workers:
worker.stop() worker.stop()
for workers in stream_workers:
for worker in workers: for worker in workers:
worker.done.wait() worker.done.wait()
@ -510,7 +512,7 @@ class SegmentGetter(object):
partial: Segment is incomplete. Hash is included. partial: Segment is incomplete. Hash is included.
temp: Segment has not been downloaded yet. A random uuid is added. temp: Segment has not been downloaded yet. A random uuid is added.
""" """
arg = str(uuid.uuid4()) if type == "temp" else b64encode(hash.digest(), "-_").rstrip("=") arg = str(uuid.uuid4()) if type == "temp" else b64encode(hash.digest(), b"-_").encode().rstrip("=")
return "{}-{}-{}.ts".format(self.prefix, type, arg) return "{}-{}-{}.ts".format(self.prefix, type, arg)
def exists(self): def exists(self):
@ -519,10 +521,10 @@ class SegmentGetter(object):
try: try:
candidates = os.listdir(dirname) candidates = os.listdir(dirname)
except OSError as e: except OSError as e:
# on ENOENT (doesn't exist), return [] # on ENOENT (doesn't exist), return false
if e.errno != errno.ENOENT: if e.errno != errno.ENOENT:
raise raise
return [] return False
full_prefix = "{}-full".format(self.prefix) full_prefix = "{}-full".format(self.prefix)
return any(candidate.startswith(full_prefix) for candidate in candidates) return any(candidate.startswith(full_prefix) for candidate in candidates)
@ -563,17 +565,14 @@ class SegmentGetter(object):
for chunk in resp.iter_content(8192): for chunk in resp.iter_content(8192):
f.write(chunk) f.write(chunk)
hash.update(chunk) hash.update(chunk)
except Exception: except Exception as e:
# save original exception so we can re-raise it later even though we may be handling
# another exception in the interim
ex_type, ex, tb = sys.exc_info()
if file_created: if file_created:
partial_path = self.make_path("partial", hash) partial_path = self.make_path("partial", hash)
self.logger.warning("Saving partial segment {} as {}".format(temp_path, partial_path)) self.logger.warning("Saving partial segment {} as {}".format(temp_path, partial_path))
common.rename(temp_path, partial_path) common.rename(temp_path, partial_path)
segments_downloaded.labels(type="partial", channel=self.channel, quality=self.quality).inc() segments_downloaded.labels(type="partial", channel=self.channel, quality=self.quality).inc()
segment_duration_downloaded.labels(type="partial", channel=self.channel, quality=self.quality).inc(self.segment.duration) segment_duration_downloaded.labels(type="partial", channel=self.channel, quality=self.quality).inc(self.segment.duration)
raise ex_type, ex, tb raise e
else: else:
request_duration = monotonic() - start_time request_duration = monotonic() - start_time
segment_type = "full" if request_duration < self.FETCH_SUSPECT_TIME else "suspect" segment_type = "full" if request_duration < self.FETCH_SUSPECT_TIME else "suspect"

Loading…
Cancel
Save