You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
wubloader/backfiller/backfiller/main.py

255 lines
8.3 KiB
Python

"""Download segments from other nodes to catch stuff this node missed."""
# TODO more logging, better exception handling
import datetime
import errno
import logging
import os
import random
import time
import uuid
import requests
import common
HOUR_FMT = '%Y-%m-%dT%H'
TIMEOUT = 5 #default timeout for remote requests
def get_nodes():
"""List address of other wubloaders.
This returns a list of the other wubloaders as strings of the form
'protocol://host:port/'"""
# either read a config file or query the database to get the addresses
# of the other nodes
# figure out some way that the local machine isn't in the list of returned
# nodes so that
# as a prototype can just hardcode some addresses.
logging.info('Fetching list of other nodes')
nodes = ['toodles.videostrike.team:1337/']
#nodes = []
return nodes
def list_local_segments(base_dir, stream, variant, hour):
"""List segments in a given hour directory.
For a given base_dir/stream/variant/hour directory return a list of
non-hidden files. If the directory path is not found, return an empty list.
Based on based on restreamer.list_segments. We could just call
restreamer.list_segments but this avoids HTTP/JSON overheads."""
path = os.path.join(base_dir, stream, variant, hour)
try:
return [name for name in os.listdir(path) if not name.startswith('.')]
except OSError as e:
if e.errno != errno.ENOENT:
raise
return []
def list_remote_hours(node, stream, variant, timeout=TIMEOUT):
"""Wrapper around a call to restreamer.list_hours."""
uri = '{}/files/{}/{}'.format(node, stream, variant)
logging.debug('Getting list of hours from {}'.format(uri))
resp = requests.get(uri, timeout=timeout)
return resp.json()
def list_remote_segments(node, stream, variant, hour, timeout=TIMEOUT):
"""Wrapper around a call to restreamer.list_segments."""
uri = '{}/files/{}/{}/{}'.format(node, stream, variant, hour)
logging.debug('Getting list of segments from {}'.format(uri))
resp = requests.get(uri, timeout=timeout)
return resp.json()
def get_remote_segment(base_dir, node, stream, variant, hour, missing_segment,
timeout=TIMEOUT):
"""Get a segment from a node.
Fetches stream/variant/hour/missing_segment from node and puts it in
base_dir/stream/variant/hour/missing_segment. If the segment already exists
locally, this does not attempt to fetch it."""
path = os.path.join(base_dir, stream, variant, hour, missing_segment)
logging.debug('Getting segment {}'.format(path))
# check to see if file was created since we listed the local segments to
# avoid unnecessarily copying
if os.path.exists(path):
return
dir_name = os.path.dirname(path)
date, duration, _ = os.path.basename(path).split('-', 2)
temp_name = "-".join([date, duration, "temp", str(uuid.uuid4())])
temp_path = os.path.join(dir_name, "{}.ts".format(temp_name))
common.ensure_directory(temp_path)
try:
uri = '{}/segments/{}/{}/{}/{}'.format(node, stream, variant, hour, missing_segment)
resp = requests.get(uri, stream=True, timeout=timeout)
resp.raise_for_status()
with open(temp_path, 'w') as f:
for chunk in resp.iter_content(8192):
f.write(chunk)
#try to get rid of the temp file if an exception is raised.
except Exception:
if os.path.exists(temp_path):
os.remove(temp_path)
raise
logging.debug('Saving completed segment {} as {}'.format(temp_path, path))
common.rename(temp_path, path)
def backfill(base_dir, stream, variants, hours=None, nodes=None):
"""Loop over nodes backfilling from each.
Backfill from node/stream/variants to base_dir/stream/variants for each node
in nodes. If nodes is None, use get_nodes() to get a list of nodes to
backfill from. By default all hours are backfilled. If backfilling from a
node raises an exception, this just goes onto the next node."""
if nodes is None:
nodes = get_nodes()
#ideally do this in parallel
for node in nodes:
try:
backfill_node(base_dir, node, stream, variants, hours)
except Exception:
logging.exception("Error while backfilling node {}".format(node))
def is_iterable(x):
"""Test whether input is iterable."""
try:
iter(x)
except TypeError:
return False
return True
def backfill_node(base_dir, node, stream, variants, hours=None, segment_order='random', recent_cutoff=60):
"""Backfill from remote node.
Backfill from node/stream/variants to base_dir/stream/variants.
Keyword arguments:
hours -- If None (default), backfill all available hours. If iterable,
backfill only hours in iterable. Otherwise backfill the last N hours,
starting with the lastest.
segment_order -- If 'random', randomise the order of segments (default).
If 'forward', sort the segment in ascending order. If 'reverse', sort
the segments in descending order. Otherwise, do not change the order of
segments.
use start and stop to limit which segments are backfilled.
recent_cutoff -- Skip backfilling segments younger than this number of
seconds to prioritise letting the downloader grab these segments."""
logging.info('Starting backfilling from {}'.format(node))
if hours is None:
Fix some bugs and linter errors introduced by backfiller I ran `pyflakes` on the repo and found these bugs: ``` ./common/common.py:289: undefined name 'random' ./downloader/downloader/main.py:7: 'random' imported but unused ./backfiller/backfiller/main.py:150: undefined name 'variant' ./backfiller/backfiller/main.py:158: undefined name 'timedelta' ./backfiller/backfiller/main.py:171: undefined name 'sort' ./backfiller/backfiller/main.py:173: undefined name 'sort' ``` (ok, the "imported but unused" one isn't a bug, but the rest are) This fixes those, as well as a further issue I saw with sorting of hours. Iterables are not sortable. As an obvious example, what if your iterable was infinite? As a result, any attempt to sort an iterable that is not already a friendly type like a list or tuple will result in an error. We avoid this by coercing to list, fully realising the iterable and putting it into a form that python will let us sort. It also avoids the nasty side-effect of mutating the list that gets passed into us, which the caller may not expect. Consider this example: ``` >>> my_hours = ["one", "two", "three"] >>> print my_hours ["one", "two", "three"] >>> backfill_node(base_dir, node, stream, variants, hours=my_hours, order='forward') >>> print my_hours ["one", "three", "two"] ``` Also, one of the linter errors was non-trivial to fix - we were trying to get a list of hours (which is an api call for a particular variant), but at a time when we weren't dealing with a single variant. My solution was to get a list of hours for ALL variants, and take the union.
6 years ago
# gather all available hours from all variants and take the union
hours = set().union(*[
list_remote_hours(node, stream, variant)
for variant in variants
])
elif is_iterable(hours):
Fix some bugs and linter errors introduced by backfiller I ran `pyflakes` on the repo and found these bugs: ``` ./common/common.py:289: undefined name 'random' ./downloader/downloader/main.py:7: 'random' imported but unused ./backfiller/backfiller/main.py:150: undefined name 'variant' ./backfiller/backfiller/main.py:158: undefined name 'timedelta' ./backfiller/backfiller/main.py:171: undefined name 'sort' ./backfiller/backfiller/main.py:173: undefined name 'sort' ``` (ok, the "imported but unused" one isn't a bug, but the rest are) This fixes those, as well as a further issue I saw with sorting of hours. Iterables are not sortable. As an obvious example, what if your iterable was infinite? As a result, any attempt to sort an iterable that is not already a friendly type like a list or tuple will result in an error. We avoid this by coercing to list, fully realising the iterable and putting it into a form that python will let us sort. It also avoids the nasty side-effect of mutating the list that gets passed into us, which the caller may not expect. Consider this example: ``` >>> my_hours = ["one", "two", "three"] >>> print my_hours ["one", "two", "three"] >>> backfill_node(base_dir, node, stream, variants, hours=my_hours, order='forward') >>> print my_hours ["one", "three", "two"] ``` Also, one of the linter errors was non-trivial to fix - we were trying to get a list of hours (which is an api call for a particular variant), but at a time when we weren't dealing with a single variant. My solution was to get a list of hours for ALL variants, and take the union.
6 years ago
hours = list(hours) # coerce to list so it can be sorted
else:
n_hours = hours
if n_hours < 1:
raise ValueError('Number of hours has to be 1 or greater')
now = datetime.datetime.utcnow()
Fix some bugs and linter errors introduced by backfiller I ran `pyflakes` on the repo and found these bugs: ``` ./common/common.py:289: undefined name 'random' ./downloader/downloader/main.py:7: 'random' imported but unused ./backfiller/backfiller/main.py:150: undefined name 'variant' ./backfiller/backfiller/main.py:158: undefined name 'timedelta' ./backfiller/backfiller/main.py:171: undefined name 'sort' ./backfiller/backfiller/main.py:173: undefined name 'sort' ``` (ok, the "imported but unused" one isn't a bug, but the rest are) This fixes those, as well as a further issue I saw with sorting of hours. Iterables are not sortable. As an obvious example, what if your iterable was infinite? As a result, any attempt to sort an iterable that is not already a friendly type like a list or tuple will result in an error. We avoid this by coercing to list, fully realising the iterable and putting it into a form that python will let us sort. It also avoids the nasty side-effect of mutating the list that gets passed into us, which the caller may not expect. Consider this example: ``` >>> my_hours = ["one", "two", "three"] >>> print my_hours ["one", "two", "three"] >>> backfill_node(base_dir, node, stream, variants, hours=my_hours, order='forward') >>> print my_hours ["one", "three", "two"] ``` Also, one of the linter errors was non-trivial to fix - we were trying to get a list of hours (which is an api call for a particular variant), but at a time when we weren't dealing with a single variant. My solution was to get a list of hours for ALL variants, and take the union.
6 years ago
hours = [(now - i * datetime.timedelta(hours=1)).strftime(HOUR_FMT) for i in range(n_hours)]
for variant in variants:
for hour in hours:
local_segments = set(list_local_segments(base_dir, stream, variant, hour))
remote_segments = set(list_remote_segments(node, stream, variant, hour))
missing_segments = list(remote_segments - local_segments)
# useful if running in parallel so multiple nodes don't request the same segment at the same time
if segment_order == 'random':
random.shuffle(missing_segments)
elif segment_order == 'forward':
missing_segments.sort()
elif segment_order == 'reverse':
missing_segments.sort(reverse=True)
for missing_segment in missing_segments:
path = os.path.join(stream, variant, hour, missing_segment)
# test to see if file is a segment and get the segments start time
try:
segment = common.parse_segment_path(path)
except ValueError as e:
logging.warning('File {} invaid: {}'.format(path, e))
continue
#to avoid getting in the downloader's way ignore segments less than recent_cutoff old
if datetime.datetime.utcnow() - segment['start'] < datetime.timedelta(seconds=recent_cutoff):
continue
get_remote_segment(base_dir, node, stream, variant, hour, missing_segment)
logging.info('Finished backfilling from {}'.format(node))
def main(base_dir='.', stream='', variants='', fill_wait=5, full_fill_wait=180, sleep_time=1):
"""Prototype backfiller service.
Do a backfill of the last 3 hours from stream/variants from all nodes
initially before doing a full backfill from all nodes. Then every sleep_time
minutes check to see if more than fill_wait minutes have passed since the last backfill. If so do a backfill of the last 3 hours. Also check whether it has been more than full_fill_wait minutes since the last full backfill;
if so, do a full backfill."""
# TODO replace this with a more robust event based service and backfill from multiple nodes in parallel
# stretch goal: provide an interface to trigger backfills manually
# stretch goal: use the backfiller to monitor the restreamer
variants = variants.split(',') if variants else []
logging.info('Starting backfilling {} with {} as variants to {}'.format(stream, ', '.join(variants), base_dir))
fill_start = datetime.datetime.now()
full_fill_start = fill_start
backfill(base_dir, stream, variants, 3)
backfill(base_dir, stream, variants)
# I'm sure there is a module that does this in a more robust way
# but I understand this and it gives the behaviour I want
while True:
now = datetime.datetime.now()
if now - full_fill_start > datetime.timedelta(minutes=full_fill_wait):
backfill(base_dir, stream, variants)
fill_start = now
full_fill_start = fill_start
elif now - fill_start > datetime.timedelta(minutes=fill_wait):
backfill(base_dir, stream, variants, 3)
fill_start = now
else:
time.sleep(common.jitter(60 * sleep_time))