py3 fixes for common

pull/224/head
Mike Lang 3 years ago committed by Mike Lang
parent d03ae49eec
commit 3e69000058

@ -95,7 +95,7 @@ def ensure_directory(path):
"""Create directory that contains path, as well as any parent directories, """Create directory that contains path, as well as any parent directories,
if they don't already exist.""" if they don't already exist."""
dir_path = os.path.dirname(path) dir_path = os.path.dirname(path)
os.mkdirs(dir_path, exist_ok=True) os.makedirs(dir_path, exist_ok=True)
def jitter(interval): def jitter(interval):
@ -103,14 +103,3 @@ def jitter(interval):
smooth out patterns and prevent everything from retrying at the same time. smooth out patterns and prevent everything from retrying at the same time.
""" """
return interval * (0.9 + 0.2 * random.random()) return interval * (0.9 + 0.2 * random.random())
def encode_strings(o):
"""Recurvisely handles unicode in json output."""
if isinstance(o, list):
return [encode_strings(x) for x in o]
if isinstance(o, dict):
return {k.encode('utf-8'): encode_strings(v) for k, v in o.items()}
if isinstance(o, unicode):
return o.encode('utf-8')
return o

@ -1,9 +1,6 @@
"""Code for instrumenting requests calls. Requires requests, obviously.""" """Code for instrumenting requests calls. Requires requests, obviously."""
# absolute_import prevents "import requests" in this module from just importing itself
from __future__ import absolute_import
import urlparse import urlparse
import requests.sessions import requests.sessions
@ -42,7 +39,7 @@ class InstrumentedSession(requests.sessions.Session):
start = monotonic() # we only use our own measured latency if an error occurs start = monotonic() # we only use our own measured latency if an error occurs
try: try:
with request_concurrency.labels(name, method, domain).track_inprogress(): with request_concurrency.labels(name, method, domain).track_inprogress():
response = super(InstrumentedSession, self).request(method, url, *args, **kwargs) response = super().request(method, url, *args, **kwargs)
except Exception: except Exception:
latency = monotonic() - start latency = monotonic() - start
request_latency.labels(name, method, domain, "error").observe(latency) request_latency.labels(name, method, domain, "error").observe(latency)

@ -10,7 +10,6 @@ import json
import logging import logging
import os import os
import shutil import shutil
import sys
from collections import namedtuple from collections import namedtuple
from contextlib import closing from contextlib import closing
from tempfile import TemporaryFile from tempfile import TemporaryFile
@ -22,10 +21,11 @@ from .stats import timed
def unpadded_b64_decode(s): def unpadded_b64_decode(s):
"""Decode base64-encoded string that has had its padding removed""" """Decode base64-encoded string that has had its padding removed.
Note it takes a unicode and returns a bytes."""
# right-pad with '=' to multiple of 4 # right-pad with '=' to multiple of 4
s = s + '=' * (- len(s) % 4) s = s + '=' * (- len(s) % 4)
return base64.b64decode(s, "-_") return base64.b64decode(s.encode(), b"-_")
class SegmentInfo( class SegmentInfo(
@ -91,8 +91,7 @@ def parse_segment_path(path):
) )
except ValueError as e: except ValueError as e:
# wrap error but preserve original traceback # wrap error but preserve original traceback
_, _, tb = sys.exc_info() raise ValueError("Bad path {!r}: {}".format(path, e)).with_traceback(e.__traceback__)
raise ValueError, ValueError("Bad path {!r}: {}".format(path, e)), tb
class ContainsHoles(Exception): class ContainsHoles(Exception):
@ -242,7 +241,7 @@ def best_segments_by_start(hour):
for name in segment_paths: for name in segment_paths:
try: try:
parsed.append(parse_segment_path(os.path.join(hour, name))) parsed.append(parse_segment_path(os.path.join(hour, name)))
except ValueError as e: except ValueError:
logging.warning("Failed to parse segment {!r}".format(os.path.join(hour, name)), exc_info=True) logging.warning("Failed to parse segment {!r}".format(os.path.join(hour, name)), exc_info=True)
for start_time, segments in itertools.groupby(parsed, key=lambda segment: segment.start): for start_time, segments in itertools.groupby(parsed, key=lambda segment: segment.start):
@ -282,6 +281,7 @@ def streams_info(segment):
'-of', 'json', '-show_streams', # get streams info as json '-of', 'json', '-show_streams', # get streams info as json
segment.path, segment.path,
]) ])
# output here is a bytes, but json.loads will accept it
return json.loads(output)['streams'] return json.loads(output)['streams']
@ -419,15 +419,14 @@ def fast_cut_segments(segments, start, end):
for chunk in read_chunks(proc.stdout): for chunk in read_chunks(proc.stdout):
yield chunk yield chunk
proc.wait() proc.wait()
except Exception: except Exception as ex:
ex, ex_type, tb = sys.exc_info()
# try to clean up proc, ignoring errors # try to clean up proc, ignoring errors
if proc is not None: if proc is not None:
try: try:
proc.kill() proc.kill()
except OSError: except OSError:
pass pass
raise ex, ex_type, tb raise ex
else: else:
# check if ffmpeg had errors # check if ffmpeg had errors
if proc.returncode != 0: if proc.returncode != 0:
@ -445,7 +444,7 @@ def feed_input(segments, pipe):
"""Write each segment's data into the given pipe in order. """Write each segment's data into the given pipe in order.
This is used to provide input to ffmpeg in a full cut.""" This is used to provide input to ffmpeg in a full cut."""
for segment in segments: for segment in segments:
with open(segment.path) as f: with open(segment.path, 'rb') as f:
try: try:
shutil.copyfileobj(f, pipe) shutil.copyfileobj(f, pipe)
except OSError as e: except OSError as e:

@ -4,7 +4,6 @@ import functools
import logging import logging
import os import os
import signal import signal
import sys
import gevent.lock import gevent.lock
from monotonic import monotonic from monotonic import monotonic
@ -82,8 +81,9 @@ def timed(name=None,
normalized_buckets = buckets normalized_buckets = buckets
# convert constant (non-callable) values into callables for consistency # convert constant (non-callable) values into callables for consistency
labels = { labels = {
# there's a pyflakes bug here suggesting that v is undefined, but it isn't # need to create then call a function to properly bind v as otherwise it will
k: v if callable(v) else (lambda *a, **k: v) # always return the final label value.
k: v if callable(v) else (lambda v: (lambda *a, **k: v))(v)
for k, v in labels.items() for k, v in labels.items()
} }
@ -97,13 +97,13 @@ def timed(name=None,
latency = prom.Histogram( latency = prom.Histogram(
"{}_latency".format(_name), "{}_latency".format(_name),
"Wall clock time taken to execute {}".format(_name), "Wall clock time taken to execute {}".format(_name),
labels.keys() + ['error'], list(labels.keys()) + ['error'],
buckets=buckets, buckets=buckets,
) )
cputime = prom.Histogram( cputime = prom.Histogram(
"{}_cputime".format(_name), "{}_cputime".format(_name),
"Process-wide consumed CPU time during execution of {}".format(_name), "Process-wide consumed CPU time during execution of {}".format(_name),
labels.keys() + ['error', 'type'], list(labels.keys()) + ['error', 'type'],
buckets=buckets, buckets=buckets,
) )
metrics[_name] = latency, cputime metrics[_name] = latency, cputime
@ -115,13 +115,13 @@ def timed(name=None,
normal_latency = prom.Histogram( normal_latency = prom.Histogram(
"{}_latency_normalized".format(_name), "{}_latency_normalized".format(_name),
"Wall clock time taken to execute {} per unit of work".format(_name), "Wall clock time taken to execute {} per unit of work".format(_name),
labels.keys() + ['error'], list(labels.keys()) + ['error'],
buckets=normalized_buckets, buckets=normalized_buckets,
) )
normal_cputime = prom.Histogram( normal_cputime = prom.Histogram(
"{}_cputime_normalized".format(_name), "{}_cputime_normalized".format(_name),
"Process-wide consumed CPU time during execution of {} per unit of work".format(_name), "Process-wide consumed CPU time during execution of {} per unit of work".format(_name),
labels.keys() + ['error', 'type'], list(labels.keys()) + ['error', 'type'],
buckets=normalized_buckets, buckets=normalized_buckets,
) )
metrics[normname] = normal_latency, normal_cputime metrics[normname] = normal_latency, normal_cputime
@ -133,9 +133,9 @@ def timed(name=None,
try: try:
ret = fn(*args, **kwargs) ret = fn(*args, **kwargs)
except Exception: except Exception as e:
ret = None ret = None
error_type, error, tb = sys.exc_info() error = e
else: else:
error = None error = None
@ -169,7 +169,7 @@ def timed(name=None,
if error is None: if error is None:
return ret return ret
raise error_type, error, tb # re-raise error with original traceback raise error from None # re-raise error with original traceback
return wrapper return wrapper
@ -255,6 +255,3 @@ def install_stacksampler(interval=0.005):
signal.signal(signal.SIGVTALRM, sample) signal.signal(signal.SIGVTALRM, sample)
# deliver the first signal in INTERVAL seconds # deliver the first signal in INTERVAL seconds
signal.setitimer(signal.ITIMER_VIRTUAL, interval) signal.setitimer(signal.ITIMER_VIRTUAL, interval)

Loading…
Cancel
Save