diff --git a/common/common/__init__.py b/common/common/__init__.py index ccd48ef..47bf044 100644 --- a/common/common/__init__.py +++ b/common/common/__init__.py @@ -95,7 +95,7 @@ def ensure_directory(path): """Create directory that contains path, as well as any parent directories, if they don't already exist.""" dir_path = os.path.dirname(path) - os.mkdirs(dir_path, exist_ok=True) + os.makedirs(dir_path, exist_ok=True) def jitter(interval): @@ -103,14 +103,3 @@ def jitter(interval): smooth out patterns and prevent everything from retrying at the same time. """ return interval * (0.9 + 0.2 * random.random()) - - -def encode_strings(o): - """Recurvisely handles unicode in json output.""" - if isinstance(o, list): - return [encode_strings(x) for x in o] - if isinstance(o, dict): - return {k.encode('utf-8'): encode_strings(v) for k, v in o.items()} - if isinstance(o, unicode): - return o.encode('utf-8') - return o diff --git a/common/common/requests.py b/common/common/requests.py index 6e6363a..372c790 100644 --- a/common/common/requests.py +++ b/common/common/requests.py @@ -1,9 +1,6 @@ """Code for instrumenting requests calls. Requires requests, obviously.""" -# absolute_import prevents "import requests" in this module from just importing itself -from __future__ import absolute_import - import urlparse import requests.sessions @@ -42,7 +39,7 @@ class InstrumentedSession(requests.sessions.Session): start = monotonic() # we only use our own measured latency if an error occurs try: with request_concurrency.labels(name, method, domain).track_inprogress(): - response = super(InstrumentedSession, self).request(method, url, *args, **kwargs) + response = super().request(method, url, *args, **kwargs) except Exception: latency = monotonic() - start request_latency.labels(name, method, domain, "error").observe(latency) diff --git a/common/common/segments.py b/common/common/segments.py index 00e8983..7a6e1da 100644 --- a/common/common/segments.py +++ b/common/common/segments.py @@ -10,7 +10,6 @@ import json import logging import os import shutil -import sys from collections import namedtuple from contextlib import closing from tempfile import TemporaryFile @@ -22,10 +21,11 @@ from .stats import timed def unpadded_b64_decode(s): - """Decode base64-encoded string that has had its padding removed""" + """Decode base64-encoded string that has had its padding removed. + Note it takes a unicode and returns a bytes.""" # right-pad with '=' to multiple of 4 s = s + '=' * (- len(s) % 4) - return base64.b64decode(s, "-_") + return base64.b64decode(s.encode(), b"-_") class SegmentInfo( @@ -91,8 +91,7 @@ def parse_segment_path(path): ) except ValueError as e: # wrap error but preserve original traceback - _, _, tb = sys.exc_info() - raise ValueError, ValueError("Bad path {!r}: {}".format(path, e)), tb + raise ValueError("Bad path {!r}: {}".format(path, e)).with_traceback(e.__traceback__) class ContainsHoles(Exception): @@ -242,7 +241,7 @@ def best_segments_by_start(hour): for name in segment_paths: try: parsed.append(parse_segment_path(os.path.join(hour, name))) - except ValueError as e: + except ValueError: logging.warning("Failed to parse segment {!r}".format(os.path.join(hour, name)), exc_info=True) for start_time, segments in itertools.groupby(parsed, key=lambda segment: segment.start): @@ -282,6 +281,7 @@ def streams_info(segment): '-of', 'json', '-show_streams', # get streams info as json segment.path, ]) + # output here is a bytes, but json.loads will accept it return json.loads(output)['streams'] @@ -419,15 +419,14 @@ def fast_cut_segments(segments, start, end): for chunk in read_chunks(proc.stdout): yield chunk proc.wait() - except Exception: - ex, ex_type, tb = sys.exc_info() + except Exception as ex: # try to clean up proc, ignoring errors if proc is not None: try: proc.kill() except OSError: pass - raise ex, ex_type, tb + raise ex else: # check if ffmpeg had errors if proc.returncode != 0: @@ -445,7 +444,7 @@ def feed_input(segments, pipe): """Write each segment's data into the given pipe in order. This is used to provide input to ffmpeg in a full cut.""" for segment in segments: - with open(segment.path) as f: + with open(segment.path, 'rb') as f: try: shutil.copyfileobj(f, pipe) except OSError as e: diff --git a/common/common/stats.py b/common/common/stats.py index 40f783a..68777d3 100644 --- a/common/common/stats.py +++ b/common/common/stats.py @@ -4,7 +4,6 @@ import functools import logging import os import signal -import sys import gevent.lock from monotonic import monotonic @@ -82,8 +81,9 @@ def timed(name=None, normalized_buckets = buckets # convert constant (non-callable) values into callables for consistency labels = { - # there's a pyflakes bug here suggesting that v is undefined, but it isn't - k: v if callable(v) else (lambda *a, **k: v) + # need to create then call a function to properly bind v as otherwise it will + # always return the final label value. + k: v if callable(v) else (lambda v: (lambda *a, **k: v))(v) for k, v in labels.items() } @@ -97,13 +97,13 @@ def timed(name=None, latency = prom.Histogram( "{}_latency".format(_name), "Wall clock time taken to execute {}".format(_name), - labels.keys() + ['error'], + list(labels.keys()) + ['error'], buckets=buckets, ) cputime = prom.Histogram( "{}_cputime".format(_name), "Process-wide consumed CPU time during execution of {}".format(_name), - labels.keys() + ['error', 'type'], + list(labels.keys()) + ['error', 'type'], buckets=buckets, ) metrics[_name] = latency, cputime @@ -115,13 +115,13 @@ def timed(name=None, normal_latency = prom.Histogram( "{}_latency_normalized".format(_name), "Wall clock time taken to execute {} per unit of work".format(_name), - labels.keys() + ['error'], + list(labels.keys()) + ['error'], buckets=normalized_buckets, ) normal_cputime = prom.Histogram( "{}_cputime_normalized".format(_name), "Process-wide consumed CPU time during execution of {} per unit of work".format(_name), - labels.keys() + ['error', 'type'], + list(labels.keys()) + ['error', 'type'], buckets=normalized_buckets, ) metrics[normname] = normal_latency, normal_cputime @@ -133,9 +133,9 @@ def timed(name=None, try: ret = fn(*args, **kwargs) - except Exception: + except Exception as e: ret = None - error_type, error, tb = sys.exc_info() + error = e else: error = None @@ -169,7 +169,7 @@ def timed(name=None, if error is None: return ret - raise error_type, error, tb # re-raise error with original traceback + raise error from None # re-raise error with original traceback return wrapper @@ -255,6 +255,3 @@ def install_stacksampler(interval=0.005): signal.signal(signal.SIGVTALRM, sample) # deliver the first signal in INTERVAL seconds signal.setitimer(signal.ITIMER_VIRTUAL, interval) - - -