Merge pull request #142 from ekimekim/mike/fixes

Misc fixes and improvements
pull/146/head
Mike Lang 5 years ago committed by GitHub
commit 48d4dddb1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -3,6 +3,7 @@
import datetime
import errno
import hashlib
import itertools
import logging
import os
import random
@ -45,13 +46,12 @@ hash_mismatches = prom.Counter(
node_list_errors = prom.Counter(
'node_list_errors',
'Number of errors fetching a list of nodes',
['filename', 'database'],
)
backfill_errors = prom.Counter(
'backfill_errors',
'Number of errors backfilling',
['remote', 'channel'],
['remote'],
)
segments_deleted = prom.Counter(
@ -170,10 +170,10 @@ def get_remote_segment(base_dir, node, channel, quality, hour, missing_segment,
logger.info('Segment {}/{}/{} backfilled'.format(quality, hour, missing_segment))
def list_hours(node, channel, qualities, start=None):
def list_hours(node, channel, quality, start=None):
"""Return a list of all available hours from a node.
List all hours available from node/channel for each quality in qualities
List all hours available from node/channel
ordered from newest to oldest.
Keyword arguments:
@ -181,8 +181,7 @@ def list_hours(node, channel, qualities, start=None):
return hours more recent than that number of hours ago. If None (default),
all hours are returned."""
hour_lists = [list_remote_hours(node, channel, quality) for quality in qualities]
hours = list(set().union(*hour_lists))
hours = list_remote_hours(node, channel, quality)
hours.sort(reverse=True) #latest hour first
if start is not None:
@ -205,7 +204,7 @@ class BackfillerManager(object):
NODE_INTERVAL = 300 #seconds between updating list of nodes
def __init__(self, base_dir, channel, qualities, static_nodes=[],
def __init__(self, base_dir, channels, qualities, static_nodes=[],
start=None, delete_old=False, run_once=False, node_file=None,
node_database=None, localhost=None, download_concurrency=5,
recent_cutoff=120):
@ -213,7 +212,7 @@ class BackfillerManager(object):
Creates a manager for a given channel with specified qualities."""
self.base_dir = base_dir
self.channel = channel
self.channels = channels
self.qualities = qualities
self.static_nodes = static_nodes
self.start = start
@ -226,7 +225,7 @@ class BackfillerManager(object):
self.download_concurrency = download_concurrency
self.recent_cutoff = recent_cutoff
self.stopping = gevent.event.Event()
self.logger = logging.getLogger("BackfillerManager({})".format(channel))
self.logger = logging.getLogger("BackfillerManager")
self.workers = {} # {node url: worker}
def stop(self):
@ -258,8 +257,8 @@ class BackfillerManager(object):
else:
self.logger.info('Deleting hours older than {} hours ago'.format(self.start))
for quality in self.qualities:
hours = list_local_hours(self.base_dir, self.channel, quality)
for channel, quality in itertools.product(self.channels, self.qualities):
hours = list_local_hours(self.base_dir, channel, quality)
if not isinstance(self.start, datetime.datetime):
cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=self.start)
else:
@ -271,13 +270,13 @@ class BackfillerManager(object):
# deleting segments can take a bit time but is less important
# than the actually backfilling so we yield
gevent.idle()
path = os.path.join(self.base_dir, self.channel, quality, hour)
path = os.path.join(self.base_dir, channel, quality, hour)
self.logger.info('Deleting {}'.format(path))
segments = list_local_segments(self.base_dir, self.channel, quality, hour)
segments = list_local_segments(self.base_dir, channel, quality, hour)
for segment in segments:
try:
os.remove(os.path.join(path, segment))
segments_deleted.labels(channel=self.channel, quality=quality, hour=hour).inc()
segments_deleted.labels(channel=channel, quality=quality, hour=hour).inc()
except OSError as e:
# ignore error when the file is already gone
if e.errno != errno.ENOENT:
@ -321,7 +320,7 @@ class BackfillerManager(object):
failures += 1
delay = common.jitter(TIMEOUT * 2**failures)
self.logger.exception('Getting nodes failed. Retrying in {:.0f} s'.format(delay))
node_list_errors.labels(filename=self.node_file).inc()
node_list_errors.inc()
self.stopping.wait(delay)
continue
exisiting_nodes = set(self.workers.keys())
@ -412,7 +411,7 @@ class BackfillerWorker(object):
self.base_dir = manager.base_dir
self.node = node
self.download_concurrency = manager.download_concurrency
self.channel = manager.channel
self.channels = manager.channels
self.qualities = manager.qualities
self.start = manager.start
self.run_once = manager.run_once
@ -421,7 +420,7 @@ class BackfillerWorker(object):
self.done = gevent.event.Event()
def __repr__(self):
return '<{} at 0x{:x} for {!r}/{!r}>'.format(type(self).__name__, id(self), self.node, self.channel)
return '<{} at 0x{:x} for {!r}>'.format(type(self).__name__, id(self), self.node)
__str__ = __repr__
def stop(self):
@ -429,15 +428,14 @@ class BackfillerWorker(object):
self.logger.info('Stopping')
self.stopping.set()
def backfill(self, hours):
def backfill(self):
"""Backfill from remote node.
Backfill from node/channel/qualities to base_dir/channel/qualities for
each hour in hours.
"""
for quality in self.qualities:
for hour in hours:
for channel, quality in itertools.product(self.channels, self.qualities):
for hour in list_hours(self.node, channel, quality, self.start):
# since backfilling can take a long time, recheck whether this
# hour is after the start
if self.start is not None:
@ -450,8 +448,8 @@ class BackfillerWorker(object):
self.logger.info('Backfilling {}/{}'.format(quality, hour))
local_segments = set(list_local_segments(self.base_dir, self.channel, quality, hour))
remote_segments = set(list_remote_segments(self.node, self.channel, quality, hour))
local_segments = set(list_local_segments(self.base_dir, channel, quality, hour))
remote_segments = set(list_remote_segments(self.node, channel, quality, hour))
missing_segments = list(remote_segments - local_segments)
# randomise the order of the segments to reduce the chance that
@ -466,7 +464,7 @@ class BackfillerWorker(object):
if self.stopping.is_set():
return
path = os.path.join(self.channel, quality, hour, missing_segment)
path = os.path.join(channel, quality, hour, missing_segment)
# test to see if file is a segment and get the segments start time
try:
@ -489,7 +487,7 @@ class BackfillerWorker(object):
# start segment as soon as a pool slot opens up, then track it in workers
workers.append(pool.spawn(
get_remote_segment,
self.base_dir, self.node, self.channel, quality, hour, missing_segment, self.logger
self.base_dir, self.node, channel, quality, hour, missing_segment, self.logger
))
# verify that all the workers succeeded. if any failed, raise the exception from
@ -498,7 +496,7 @@ class BackfillerWorker(object):
worker.get() # re-raise error, if any
self.logger.info('{} segments in {}/{} backfilled'.format(len(workers), quality, hour))
hours_backfilled.labels(remote=self.node, channel=self.channel, quality=quality).inc()
hours_backfilled.labels(remote=self.node, channel=channel, quality=quality).inc()
def run(self):
@ -508,7 +506,7 @@ class BackfillerWorker(object):
while not self.stopping.is_set():
try:
self.logger.info('Starting backfill')
self.backfill(list_hours(self.node, self.channel, self.qualities, self.start))
self.backfill()
self.logger.info('Backfill complete')
failures = 0 #reset failure count on a successful backfill
if not self.run_once:
@ -519,7 +517,7 @@ class BackfillerWorker(object):
failures += 1
delay = common.jitter(TIMEOUT * 2**failures)
self.logger.exception('Backfill failed. Retrying in {:.0f} s'.format(delay))
backfill_errors.labels(remote=self.node, channel=self.channel).inc()
backfill_errors.labels(remote=self.node).inc()
self.stopping.wait(delay)
if self.run_once:
@ -567,38 +565,19 @@ def main(channels, base_dir='.', qualities='source', metrics_port=8002,
common.install_stacksampler()
prom.start_http_server(metrics_port)
managers = []
workers = []
for channel in channels:
logging.info('Starting backfilling {} with {} as qualities to {}'.format(channel, ', '.join(qualities), base_dir))
manager = BackfillerManager(base_dir, channel, qualities, static_nodes,
start, delete_old, run_once, node_file, node_database,
localhost, download_concurrency, recent_cutoff)
managers.append(manager)
workers.append(gevent.spawn(manager.run))
logging.info('Starting backfilling {} with {} as qualities to {}'.format(', '.join(channels), ', '.join(qualities), base_dir))
manager = BackfillerManager(base_dir, channels, qualities, static_nodes,
start, delete_old, run_once, node_file, node_database,
localhost, download_concurrency, recent_cutoff)
def stop():
for manager in managers:
manager.stop()
manager.stop()
gevent.signal(signal.SIGTERM, stop)
if backdoor_port:
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
# Wait for any to die
gevent.wait(workers, count=1)
# If one has stopped, either:
# 1. stop() was called and all are stopping
# 2. one errored and we should stop all remaining and report the error
# Our behaviour in both cases is the same:
# 1. Tell all managers to gracefully stop
stop()
# 2. Wait (with timeout) until they've stopped
gevent.wait(workers)
# 3. Check if any of them failed. If they did, report it. If mulitple
# failed, we report one arbitrarily.
for worker in workers:
worker.get()
manager.run()
logging.info('Gracefully stopped')

@ -3,7 +3,7 @@
set -eu
# cd to location of script
cd "$(dirname "$(readlink -f "$0")")"
cd "$(dirname "$(realpath "$0")")"
# Builds the docker images.
# Usage: ./build {COMPONENTS}, or just ./build to build all.

@ -1,9 +1,12 @@
"""Code for instrumenting requests calls. Requires requests, obviously."""
# absolute_import prevents "import requests" in this module from just importing itself
from __future__ import absolute_import
import urlparse
import requests
import requests.sessions
import prometheus_client as prom
from monotonic import monotonic
@ -27,7 +30,7 @@ request_concurrency = prom.Gauge(
['name', 'method', 'domain'],
)
class InstrumentedSession(requests.Session):
class InstrumentedSession(requests.sessions.Session):
"""A requests Session that automatically records metrics on requests made.
Users may optionally pass a 'metric_name' kwarg that will be included as the 'name' label.
"""
@ -45,7 +48,11 @@ class InstrumentedSession(requests.Session):
request_latency.labels(name, method, domain, "error").observe(latency)
raise
request_latency.labels(name, method, domain, response.status_code).observe(response.elapsed)
if 'content-length' in response.headers:
response_size.labels(name, method, domain, response.status_code).observe(response.headers['content-length'])
request_latency.labels(name, method, domain, response.status_code).observe(response.elapsed.total_seconds())
try:
content_length = int(response.headers['content-length'])
except (KeyError, ValueError):
pass # either not present or not valid
else:
response_size.labels(name, method, domain, response.status_code).observe(content_length)
return response

@ -91,8 +91,8 @@ def timed(name=None,
# can't safely assign to name inside closure, we use a new _name variable instead
_name = fn.__name__ if name is None else name
if name in metrics:
latency, cputime = metrics[name]
if _name in metrics:
latency, cputime = metrics[_name]
else:
latency = prom.Histogram(
"{}_latency".format(_name),
@ -106,9 +106,9 @@ def timed(name=None,
labels.keys() + ['error', 'type'],
buckets=buckets,
)
metrics[name] = latency, cputime
metrics[_name] = latency, cputime
if normalize:
normname = '{} normalized'.format(name)
normname = '{} normalized'.format(_name)
if normname in metrics:
normal_latency, normal_cputime = metrics[normname]
else:

@ -341,7 +341,7 @@ class Cutter(object):
if result.rowcount != 1:
# If we hadn't yet set finalizing, then this means an operator cancelled the job
# while we were cutting it. This isn't a problem.
if not finalize_begun:
if not finalize_begun[0]:
raise JobCancelled()
raise JobConsistencyError("No job with id {} and uploader {} when setting: {}".format(
job.id, self.name, ", ".join("{} = {!r}".format(k, v) for k, v in kwargs.items())
@ -444,7 +444,8 @@ class Cutter(object):
# Success! Set TRANSCODING or DONE and clear any previous error.
success_state = 'TRANSCODING' if upload_backend.needs_transcode else 'DONE'
set_row(state=success_state, video_id=video_id, video_link=video_link, error=None)
maybe_upload_time = {"upload_time": datetime.datetime.utcnow()} if success_state == 'DONE' else {}
set_row(state=success_state, video_id=video_id, video_link=video_link, error=None, **maybe_upload_time)
self.logger.info("Successfully cut and uploaded job {} as {}".format(format_job(job), video_link))
videos_uploaded.labels(video_channel=job.video_channel,
@ -617,7 +618,7 @@ def main(
conn = dbmanager.get_conn()
except Exception:
delay = common.jitter(10)
logging.info('Cannot connect to database. Retrying in {:.0f} s'.format(delay))
logging.warning('Cannot connect to database. Retrying in {:.0f} s'.format(delay), exc_info=True)
stop.wait(delay)
else:
# put it back so it gets reused on next get_conn()

@ -76,7 +76,7 @@ class UploadBackend(object):
# reasonable default if settings don't otherwise matter:
# high-quality mpegts, without wasting too much cpu on encoding
encoding_args = ['-c:v', 'libx264', '-preset', 'ultrafast', '-crf', '0', '-f', 'mpegts']
encoding_settings = ['-c:v', 'libx264', '-preset', 'ultrafast', '-crf', '0', '-f', 'mpegts']
encoding_streamable = True
def upload_video(self, title, description, tags, data):
@ -224,11 +224,10 @@ class Local(UploadBackend):
# ignore already-exists errors
def upload_video(self, title, description, tags, data):
video_id = uuid.uuid4()
video_id = str(uuid.uuid4())
# make title safe by removing offending characters, replacing with '-'
safe_title = re.sub('[^A-Za-z0-9_]', '-', title)
# If fast cut enabled, use .ts, otherwise use .mp4
ext = 'ts' if self.encoding_settings is None else 'mp4'
ext = 'ts'
filename = '{}-{}.{}'.format(safe_title, video_id, ext)
filepath = os.path.join(self.path, filename)
try:

@ -127,13 +127,13 @@
// The spreadsheet id and worksheet names for sheet sync to act on
sheet_id:: "your_id_here",
worksheets:: ["Tech Test & Pre-Show"] + ["Day %d" % n for n in std.range(1, 7)],
worksheets:: ["Tech Test & Preshow"] + ["Day %d" % n for n in std.range(1, 7)],
// Now for the actual docker-compose config
// The connection string for the database. Constructed from db_args.
db_connect:: std.join(" ", [
"%s=%s" % [key, $.db_args[key]]
"%s='%s'" % [key, $.db_args[key]]
for key in std.objectFields($.db_args)
]),
@ -206,6 +206,7 @@
"--base-dir", "/mnt",
"--backdoor-port", std.toString($.backdoor_port),
"--tags", std.join(",", $.video_tags),
"--name", $.localhost,
$.db_connect,
std.manifestJson($.cutter_config),
"/etc/wubloader-creds.json",

@ -126,6 +126,7 @@ class StreamsManager(object):
self.refresh_needed = gevent.event.Event() # set to tell main loop to refresh now
self.stopping = gevent.event.Event() # set to tell main loop to stop
self.important = important
self.master_playlist_log_level = logging.INFO if important else logging.DEBUG
if self.important:
self.FETCH_MIN_INTERVAL = self.IMPORTANT_FETCH_MIN_INTERVAL
@ -203,7 +204,7 @@ class StreamsManager(object):
"""Re-fetch master playlist and start new workers if needed"""
try:
# Fetch playlist. On soft timeout, retry.
self.logger.info("Fetching master playlist")
self.logger.log(self.master_playlist_log_level, "Fetching master playlist")
fetch_time = monotonic()
with soft_hard_timeout(self.logger, "fetching master playlist", self.FETCH_TIMEOUTS, self.trigger_refresh):
master_playlist = twitch.get_master_playlist(self.channel)
@ -226,8 +227,7 @@ class StreamsManager(object):
except Exception as e:
if isinstance(e, requests.HTTPError) and e.response is not None and e.response.status_code == 404:
# Log about important streams being down at info, but others at debug.
level = logging.INFO if self.important else logging.DEBUG
self.logger.log(level, "Stream is not up. Retrying.")
self.logger.log(self.master_playlist_log_level, "Stream is not up. Retrying.")
self.trigger_refresh()
else:
self.logger.exception("Failed to fetch master playlist")
@ -243,7 +243,7 @@ class StreamsManager(object):
self.MAX_WORKER_AGE - workers[-1].age()
for workers in self.stream_workers.values() if workers
] or [0]))
self.logger.info("Next master playlist refresh in at most {} sec".format(time_to_next_max_age))
self.logger.log(self.master_playlist_log_level, "Next master playlist refresh in at most {} sec".format(time_to_next_max_age))
# wait until refresh triggered, next max age reached, or we're stopping (whichever happens first)
gevent.wait([self.stopping, self.refresh_needed], timeout=time_to_next_max_age, count=1)
if not self.stopping.is_set():

@ -3,7 +3,7 @@
set -eu
# cd to location of script
cd "$(dirname "$(readlink -f "$0")")"
cd "$(dirname "$(realpath "$0")")"
# We generate first, and capture the output, to avoid overwriting the file on error.
# To avoid jsonnet needing to exist locally, we run it in a container.

@ -1,7 +1,7 @@
#!/bin/bash
# cd to location of script
cd "$(dirname "$(readlink -f "$0")")"
cd "$(dirname "$(realpath "$0")")"
TAG=$(git rev-parse --short HEAD)
if [ -n "$(git status --porcelain --untracked-files=no)" ]; then

@ -192,7 +192,7 @@ class CoverageChecker(object):
hours = [name for name in os.listdir(path) if not name.startswith('.')]
except OSError as e:
if e.errno == errno.ENOENT:
self.logger.warning('{} does not exist'.format(path))
self.logger.info('{} does not exist, skipping'.format(path))
continue
hours.sort()

@ -214,11 +214,11 @@ class SheetSync(object):
sql.SQL(", ").join(sql.Placeholder(col) for col in insert_cols),
)
query(self.conn, built_query, sheet_name=worksheet, **row)
rows_found(worksheet).inc()
rows_changed('insert', worksheet).inc()
rows_found.labels(worksheet).inc()
rows_changed.labels('insert', worksheet).inc()
return
rows_found(worksheet).inc()
rows_found.labels(worksheet).inc()
# Update database with any changed inputs
changed = [col for col in self.input_columns if row[col] != getattr(event, col)]
@ -236,7 +236,7 @@ class SheetSync(object):
) for col in changed
))
query(self.conn, built_query, **row)
rows_changed('input', worksheet).inc()
rows_changed.labels('input', worksheet).inc()
# Update sheet with any changed outputs
format_output = lambda v: '' if v is None else v # cast nulls to empty string
@ -251,7 +251,7 @@ class SheetSync(object):
row_index, self.column_map[col],
format_output(getattr(event, col)),
)
rows_changed('output', worksheet).inc()
rows_changed.labels('output', worksheet).inc()
# Set edit link if marked for editing and start/end set.
# This prevents accidents / clicking the wrong row and provides

@ -40,7 +40,11 @@ pageSetup = function(isEditor) {
document.getElementById("AllowHoles").checked = data.allow_holes;
document.getElementById("uploaderWhitelist").value = (!!data.uploader_whitelist) ? data.uploader_whitelist.join(",") : "";
if (
(data.upload_locations.length > 0 && data.upload_location != data.upload_locations[0])
(
data.upload_locations.length > 0
&& data.upload_location != null
&& data.upload_location != data.upload_locations[0]
)
|| data.allow_holes
|| !!data.uploader_whitelist
) {
@ -253,6 +257,7 @@ thrimbletrimmerSubmit = function(state) {
setTimeout(() => { window.location.href = '/thrimbletrimmer/dashboard.html'; }, 500);
return
}
alert("Draft saved");
document.getElementById('SubmitButton').disabled = false;
}));
};

@ -57,8 +57,6 @@
To fix this, re-load the video in the desired time range (default: the last 10 minutes) by clicking Load Playlist.<br/>
Download Quality: <select id="qualityLevel"></select>
<input type="button" id="DownloadButton" value="Download this time range" onclick="thrimbletrimmerDownload()"/>
<a href="/thrimbletrimmer/dashboard.html">Go To Dashboard</a> |
<a href="/thrimbletrimmer/dashboard.html">Go To Editor</a>
<a id="HelpButton" style="float:right;" href="JavaScript:toggleHiddenPane('HelpPane');">Help</a>
<a id="UltrawideButton" style="float:right;margin-right:10px;" href="JavaScript:toggleUltrawide();">Ultrawide</a>
</div>

@ -24,6 +24,9 @@ body.ultrawide .my-player-dimensions { width:100% !important; }
.vjs-menu-button-popup .vjs-menu {
bottom:-3px;
}
.video-js .vjs-time-display {
width: 100px;
}
.video-js .vjs-picture-in-picture-control {
display: none;
}

Loading…
Cancel
Save