|
|
@ -21,6 +21,7 @@ from monotonic import monotonic
|
|
|
|
import twitch
|
|
|
|
import twitch
|
|
|
|
import common
|
|
|
|
import common
|
|
|
|
import common.dateutil
|
|
|
|
import common.dateutil
|
|
|
|
|
|
|
|
import common.requests
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
segments_downloaded = prom.Counter(
|
|
|
|
segments_downloaded = prom.Counter(
|
|
|
@ -285,7 +286,7 @@ class StreamWorker(object):
|
|
|
|
# with our connection pool.
|
|
|
|
# with our connection pool.
|
|
|
|
# This worker's SegmentGetters will use its session by default for performance,
|
|
|
|
# This worker's SegmentGetters will use its session by default for performance,
|
|
|
|
# but will fall back to a new one if something goes wrong.
|
|
|
|
# but will fall back to a new one if something goes wrong.
|
|
|
|
self.session = requests.Session()
|
|
|
|
self.session = common.requests.InstrumentedSession()
|
|
|
|
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
def __repr__(self):
|
|
|
|
return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.quality)
|
|
|
|
return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.quality)
|
|
|
@ -461,7 +462,7 @@ class SegmentGetter(object):
|
|
|
|
break
|
|
|
|
break
|
|
|
|
# Create a new session, so we don't reuse a connection from the old session
|
|
|
|
# Create a new session, so we don't reuse a connection from the old session
|
|
|
|
# which had an error / some other issue. This is mostly just out of paranoia.
|
|
|
|
# which had an error / some other issue. This is mostly just out of paranoia.
|
|
|
|
self.session = requests.Session()
|
|
|
|
self.session = common.requests.InstrumentedSession()
|
|
|
|
# if retry not set, wait for FETCH_RETRY first
|
|
|
|
# if retry not set, wait for FETCH_RETRY first
|
|
|
|
self.retry.wait(common.jitter(self.FETCH_RETRY))
|
|
|
|
self.retry.wait(common.jitter(self.FETCH_RETRY))
|
|
|
|
self.logger.debug("Getter is done")
|
|
|
|
self.logger.debug("Getter is done")
|
|
|
@ -522,7 +523,7 @@ class SegmentGetter(object):
|
|
|
|
self.logger.debug("Downloading segment {} to {}".format(self.segment, temp_path))
|
|
|
|
self.logger.debug("Downloading segment {} to {}".format(self.segment, temp_path))
|
|
|
|
with soft_hard_timeout(self.logger, "getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set):
|
|
|
|
with soft_hard_timeout(self.logger, "getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set):
|
|
|
|
with soft_hard_timeout(self.logger, "getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set):
|
|
|
|
with soft_hard_timeout(self.logger, "getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set):
|
|
|
|
resp = self.session.get(self.segment.uri, stream=True)
|
|
|
|
resp = self.session.get(self.segment.uri, stream=True, metric_name='get_segment')
|
|
|
|
# twitch returns 403 for expired segment urls, and 404 for very old urls where the original segment is gone.
|
|
|
|
# twitch returns 403 for expired segment urls, and 404 for very old urls where the original segment is gone.
|
|
|
|
# the latter can happen if we have a network issue that cuts us off from twitch for some time.
|
|
|
|
# the latter can happen if we have a network issue that cuts us off from twitch for some time.
|
|
|
|
if resp.status_code in (403, 404):
|
|
|
|
if resp.status_code in (403, 404):
|
|
|
|