diff --git a/downloader/downloader/main.py b/downloader/downloader/main.py index 4a42d2b..1f82aa6 100644 --- a/downloader/downloader/main.py +++ b/downloader/downloader/main.py @@ -21,6 +21,7 @@ from monotonic import monotonic import twitch import common import common.dateutil +import common.requests segments_downloaded = prom.Counter( @@ -285,7 +286,7 @@ class StreamWorker(object): # with our connection pool. # This worker's SegmentGetters will use its session by default for performance, # but will fall back to a new one if something goes wrong. - self.session = requests.Session() + self.session = common.requests.InstrumentedSession() def __repr__(self): return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.quality) @@ -461,7 +462,7 @@ class SegmentGetter(object): break # Create a new session, so we don't reuse a connection from the old session # which had an error / some other issue. This is mostly just out of paranoia. - self.session = requests.Session() + self.session = common.requests.InstrumentedSession() # if retry not set, wait for FETCH_RETRY first self.retry.wait(common.jitter(self.FETCH_RETRY)) self.logger.debug("Getter is done") @@ -522,7 +523,7 @@ class SegmentGetter(object): self.logger.debug("Downloading segment {} to {}".format(self.segment, temp_path)) with soft_hard_timeout(self.logger, "getting and writing segment", self.FETCH_FULL_TIMEOUTS, retry.set): with soft_hard_timeout(self.logger, "getting segment headers", self.FETCH_HEADERS_TIMEOUTS, retry.set): - resp = self.session.get(self.segment.uri, stream=True) + resp = self.session.get(self.segment.uri, stream=True, metric_name='get_segment') # twitch returns 403 for expired segment urls, and 404 for very old urls where the original segment is gone. # the latter can happen if we have a network issue that cuts us off from twitch for some time. if resp.status_code in (403, 404): diff --git a/downloader/downloader/twitch.py b/downloader/downloader/twitch.py index 94b60da..7124823 100644 --- a/downloader/downloader/twitch.py +++ b/downloader/downloader/twitch.py @@ -2,16 +2,18 @@ import logging import random -import requests - import hls_playlist +from common.requests import InstrumentedSession + logger = logging.getLogger(__name__) -def get_master_playlist(channel, session=requests): +def get_master_playlist(channel, session=None): """Get the master playlist for given channel from twitch""" + if session is None: + session = InstrumentedSession() resp = session.get( "https://api.twitch.tv/api/channels/{}/access_token.json".format(channel), params={'as3': 't'}, @@ -19,6 +21,7 @@ def get_master_playlist(channel, session=requests): 'Accept': 'application/vnd.twitchtv.v3+json', 'Client-ID': 'pwkzresl8kj2rdj6g7bvxl9ys1wly3j', }, + metric_name='get_access_token', ) resp.raise_for_status() # getting access token token = resp.json() @@ -42,6 +45,7 @@ def get_master_playlist(channel, session=requests): # in flux. Better to just blend in with the crowd for now. # "platform": "_" }, + metric_name='get_master_playlist', ) resp.raise_for_status() # getting master playlist playlist = hls_playlist.load(resp.text, base_uri=resp.url) @@ -96,7 +100,9 @@ def get_media_playlist_uris(master_playlist, target_qualities): return {name: variant.uri for name, variant in variants.items()} -def get_media_playlist(uri, session=requests): - resp = session.get(uri) +def get_media_playlist(uri, session=None): + if session is None: + session = InstrumentedSession() + resp = session.get(uri, metric_name='get_media_playlist') resp.raise_for_status() return hls_playlist.load(resp.text, base_uri=resp.url)