Compare commits

...

8 Commits

Author SHA1 Message Date
Mike Lang 06d5835912 remove graphs service
We've decided not to use this in its current form.

Closes #545
3 days ago
Mike Lang d1d0b1bad2 tootbot: Save messages and images to disk 3 days ago
Mike Lang 270c38d50d docker-compose: add mount_segments option for bot_service() 3 days ago
Mike Lang 7b1c8c6d20 optionally enabled debugging: Write out last 10 media playlists after fetching each segment 3 days ago
Mike Lang 065c72f71f downloader: Re-connect when we see a time error over 0.01s
We have observed an issue on twitch where there will be a small time jump
(eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7)
and all subsequent segment timestamps will be out by this amount compared
to what other downloader instances see. Our workaround for this issue is to
watch for such gaps and:
1. trigger a worker refresh (which seems to fix the issue)
2. treat all subsequent segments as "suspect" so they are still saved
but only used if no other source is available.
3 days ago
Mike Lang c16d052f79 backfiller: Don't backfill segments that only differ from existing segment by 1ms
In the wild we've seen different servers get timestamps that differ by 1ms for segments
that are otherwise identical - same content, same duration.

The allowable fudge factor for segments is already 10ms, so having timing be 1ms different
between servers shouldn't cause any problems.
Worst case, there's a slight chance you'll get an adjacent frame when picking a cut point / thumbnail.
3 days ago
Mike Lang 5c524717f2 Fix connection pool warnings by increasing pool size
in backfiller and downloader, the things making lots of outgoing http requests.

We want these larger sizes anyway to improve performance in downloader and backfiller.
3 days ago
Mike Lang d5fbcd5790 Don't have restreamer respond with metrics to non-restreamer metrics requests 3 days ago

@ -16,6 +16,7 @@ import argh
import gevent.backdoor
import gevent.pool
import prometheus_client as prom
from requests.adapters import HTTPAdapter
import common
from common import dateutil
@ -23,8 +24,10 @@ from common import database
from common.requests import InstrumentedSession
from common.segments import list_segment_files, unpadded_b64_decode
# Wraps all requests in some metric collection
# Wraps all requests in some metric collection and connection pooling
requests = InstrumentedSession()
adapter = HTTPAdapter(pool_maxsize=100)
requests.mount('https://', adapter)
segments_backfilled = prom.Counter(
'segments_backfilled',
@ -44,6 +47,12 @@ hash_mismatches = prom.Counter(
['remote', 'channel', 'quality', 'hour'],
)
small_difference_segments = prom.Gauge(
'small_difference_segments',
'Number of segments which were not pulled due to differing from existing segments by only a very small time difference',
['remote', 'channel', 'quality', 'hour'],
)
node_list_errors = prom.Counter(
'node_list_errors',
'Number of errors fetching a list of nodes',
@ -504,9 +513,21 @@ class BackfillerWorker(object):
# multiple workers request the same segment at the same time
random.shuffle(missing_segments)
if quality != 'chat':
MATCH_FIELDS = ("channel", "quality", "duration", "type", "hash")
EPSILON = 0.001
local_infos = []
for path in local_segments:
path = os.path.join(channel, quality, hour, path)
try:
local_infos.append(common.parse_segment_path(path))
except ValueError as e:
self.logger.warning('Local file {} could not be parsed: {}'.format(path, e))
pool = gevent.pool.Pool(self.download_concurrency)
workers = []
small_differences = 0
for missing_segment in missing_segments:
if self.stopping.is_set():
@ -542,6 +563,21 @@ class BackfillerWorker(object):
if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=self.recent_cutoff):
self.logger.debug('Skipping {} as too recent'.format(path))
continue
# if any local segment is within 1ms of the missing segment and otherwise identical, ignore it
found = None
for local_segment in local_infos:
# if any fields differ, no match
if not all(getattr(segment, field) == getattr(local_segment, field) for field in MATCH_FIELDS):
continue
# if time difference > epsilon, no match
if abs((segment.start - local_segment.start).total_seconds()) > EPSILON:
continue
found = local_segment
break
if found is not None:
self.logger.debug(f'Skipping {path} as within {EPSILON}s of identical segment {found.path}')
continue
# start segment as soon as a pool slot opens up, then track it in workers
workers.append(pool.spawn(
@ -549,6 +585,8 @@ class BackfillerWorker(object):
self.base_dir, self.node, channel, quality, hour, missing_segment, self.logger
))
small_difference_segments.labels(self.node, channel, quality, hour).set(small_differences)
# verify that all the workers succeeded. if any failed, raise the exception from
# one of them arbitrarily.
for worker in workers:

@ -39,7 +39,6 @@
prizebot: false,
youtubebot: false,
bus_analyzer: false,
graphs: false,
},
// Twitch channels to capture. The first one will be used as the default channel in the editor.
@ -214,11 +213,13 @@
backfill_dirs:: ["emotes"],
// Enable saving of media (images and videos - this can be large), either globally or split into
// three options:
// options:
// - From chat messages (in chat_archiver.download_media)
// - From toots
// - From the image links column in the sheet (using sheetsync)
// - Backfilled from other nodes
download_media:: true,
download_toot_media:: $.download_media,
backfill_media:: $.download_media,
download_sheet_links:: $.download_media,
@ -329,6 +330,8 @@
// Obtain an access token by running: python -m zulip_bots.tootbot get-access-token
access_token: "",
},
[if $.download_toot_media then "media_dir"]: "/mnt/media",
output_path: "/mnt/tootbot.json"
args:: [],
},
@ -368,9 +371,6 @@
channel_id: "UCz5-PNQxaT4WtB_OMAwD85g", // DesertBusForHope
},
// template for donation data urls
donation_url_template:: "https://example.com/DB{}/DB{}.json",
// Extra options to pass via environment variables,
// eg. log level, disabling stack sampling.
env:: {
@ -756,15 +756,7 @@
environment: $.env,
},
[if $.enabled.graphs then "graphs"]: {
image: $.get_image("graphs"),
restart: "always",
command: [$.donation_url_template, "--base-dir", "/mnt/graphs"],
volumes: ["%s:/mnt" % $.segments_path],
environment: $.env,
},
local bot_service(name, config, args=[], subcommand=null) = {
local bot_service(name, config, args=[], subcommand=null, mount_segments=false) = {
image: $.get_image("zulip_bots"),
restart: "always",
entrypoint: ["python3", "-m", "zulip_bots.%s" % name]
@ -772,6 +764,7 @@
+ [std.manifestJson(config)]
+ args,
environment: $.env,
[if mount_segments then "volumes"]: ["%s:/mnt" % $.segments_path],
},
[if $.enabled.schedulebot then "schedulebot"]:
@ -787,7 +780,7 @@
[if $.enabled.tootbot then "tootbot"]:
bot_service("tootbot", $.tootbot + {
zulip+: { url: $.zulip_url },
}, $.tootbot.args, subcommand="main"),
}, $.tootbot.args, subcommand="main", mount_segments=true),
[if $.enabled.twitchbot then "twitchbot"]:
bot_service("twitchbot", $.twitchbot + {
@ -797,16 +790,12 @@
[if $.enabled.pubbot then "pubbot"]:
bot_service("pubbot", $.pubbot + {
zulip_url: $.zulip_url,
}, ["/mnt/pubnub-log.json"]) + {
volumes: ["%s:/mnt" % $.segments_path],
},
}, ["/mnt/pubnub-log.json"], mount_segments=true),
[if $.enabled.blogbot then "blogbot"]:
bot_service("blogbot", $.blogbot + {
zulip_url: $.zulip_url,
}, ["--save-dir", "/mnt/blogs"]) + {
volumes: ["%s:/mnt" % $.segments_path],
},
}, ["--save-dir", "/mnt/blogs"], mount_segments=true),
[if $.enabled.prizebot then "prizebot"]:
bot_service("prizebot", $.prizebot + {

@ -15,6 +15,7 @@ import gevent.backdoor
import gevent.event
import prometheus_client as prom
import requests
import requests.adapters
from monotonic import monotonic
import common
@ -49,6 +50,24 @@ ad_segments_ignored = prom.Counter(
["channel", "quality"],
)
suspicious_skew_count = prom.Counter(
"suspicious_skew_count",
"Number of times we've restarted a worker due to suspicious skew",
["channel", "quality"],
)
segment_time_skew_non_zero_sum = prom.Gauge(
"segment_time_skew_non_zero_sum",
"Sum of all observed segment skew amounts for worker",
["channel", "quality", "worker"],
)
segment_time_skew_non_zero_count = prom.Counter(
"segment_time_skew_non_zero_count",
"Count of segments with non-zero skew for worker",
["channel", "quality", "worker"],
)
class TimedOutError(Exception):
pass
@ -122,7 +141,7 @@ class StreamsManager(object):
FETCH_TIMEOUTS = 5, 30
def __init__(self, provider, channel, base_dir, qualities, important=False):
def __init__(self, provider, channel, base_dir, qualities, important=False, history_size=0):
self.provider = provider
self.channel = channel
self.logger = logging.getLogger("StreamsManager({})".format(channel))
@ -203,7 +222,7 @@ class StreamsManager(object):
self.logger.info("Ignoring worker start as we are stopping")
return
url_time, url = self.latest_urls[quality]
worker = StreamWorker(self, quality, url, url_time)
worker = StreamWorker(self, quality, url, url_time, self.history_size)
self.stream_workers[quality].append(worker)
gevent.spawn(worker.run)
@ -290,7 +309,12 @@ class StreamWorker(object):
FETCH_RETRY_INTERVAL = 1
FETCH_POLL_INTERVAL = 2
def __init__(self, manager, quality, url, url_time):
# Max difference between a segment's time + duration and the next segment's time (in seconds)
# before we consider this "suspicious" and trigger a refresh.
# See https://github.com/dbvideostriketeam/wubloader/issues/539
MAX_SEGMENT_TIME_SKEW = 0.01
def __init__(self, manager, quality, url, url_time, history_size):
self.manager = manager
self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(quality, id(self)))
self.quality = quality
@ -305,11 +329,16 @@ class StreamWorker(object):
# This worker's SegmentGetters will use its session by default for performance,
# but will fall back to a new one if something goes wrong.
self.session = common.requests.InstrumentedSession()
adapter = requests.adapters.HTTPAdapter(pool_maxsize=100)
self.session.mount('https://', adapter)
# Map cache is a simple cache to avoid re-downloading the same map URI for every segment,
# since it's generally the same but may occasionally change.
# We expect the map data to be very small so there is no eviction here.
# {uri: data}
self.map_cache = {}
# If enabled, playlist history is saved after each segment fetch,
# showing the last N playlist fetches up until the one that resulted in that fetch.
self.history_size = history_size
def __repr__(self):
return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.quality)
@ -347,12 +376,15 @@ class StreamWorker(object):
def _run(self):
first = True
suspicious_skew = False
history = []
while not self.stopping.is_set():
self.logger.debug("Getting media playlist {}".format(self.url))
try:
with soft_hard_timeout(self.logger, "getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker):
playlist = self.manager.provider.get_media_playlist(self.url, session=self.session)
playlist_time = datetime.datetime.utcnow()
raw_playlist, playlist = self.manager.provider.get_media_playlist(self.url, session=self.session)
except Exception as e:
self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True)
self.trigger_new_worker()
@ -368,8 +400,12 @@ class StreamWorker(object):
# We successfully got the playlist at least once
first = False
if self.history_size > 0:
history = [(playlist_time, raw_playlist)] + history[:self.history_size]
# Start any new segment getters
date = None # tracks date in case some segment doesn't include it
prev_segment = None
for segment in playlist.segments:
if segment.ad_reason:
self.logger.info("Ignoring ad segment: {}".format(segment.ad_reason))
@ -380,7 +416,26 @@ class StreamWorker(object):
self.manager.mark_working(self)
if segment.date:
date = common.dateutil.parse(segment.date)
new_date = common.dateutil.parse(segment.date)
if date is not None:
# We have observed an issue on twitch where there will be a small time jump
# (eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7)
# and all subsequent segment timestamps will be out by this amount compared
# to what other downloader instances see. Our workaround for this issue is to
# watch for such gaps and:
# 1. trigger a worker refresh (which seems to fix the issue)
# 2. treat all subsequent segments as "suspect" so they are still saved
# but only used if no other source is available.
skew = (date - new_date).total_seconds()
if skew != 0:
segment_time_skew_non_zero_sum.labels(self.manager.channel, self.quality, f"{id(self):x}").inc(skew)
segment_time_skew_non_zero_count.labels(self.manager.channel, self.quality, f"{id(self):x}").inc()
if abs(skew) > self.MAX_SEGMENT_TIME_SKEW and not suspicious_skew:
self.logger.warning(f"Suspicious skew of {skew}, triggering new worker: Expected {date} after {prev_segment}, got {new_date} for {segment}")
self.trigger_new_worker()
suspicious_skew = True
suspicious_skew_count.labels(self.manager.channel, self.quality).inc()
date = new_date
if segment.uri not in self.getters:
if date is None:
raise ValueError("Cannot determine date of segment")
@ -392,11 +447,14 @@ class StreamWorker(object):
self.quality,
segment,
date,
suspicious_skew,
self.map_cache,
history,
)
gevent.spawn(self.getters[segment.uri].run)
if date is not None:
date += datetime.timedelta(seconds=segment.duration)
prev_segment = segment
# Clean up any old segment getters.
# Note use of list() to make a copy to avoid modification-during-iteration
@ -448,19 +506,21 @@ class SegmentGetter(object):
# or so, to be paranoid we set it to considerably longer than that.
GIVE_UP_TIMEOUT = 20 * 60
def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, map_cache):
def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, suspect, map_cache, history):
self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self)))
self.base_dir = base_dir
self.channel = channel
self.quality = quality
self.segment = segment
self.date = date
self.suspect = suspect
self.map_cache = map_cache
self.prefix = self.make_path_prefix()
self.retry = None # Event, set to begin retrying
self.done = gevent.event.Event() # set when file exists or we give up
# Our parent's connection pool, but we'll replace it if there's any issues
self.session = session
self.history = history
def run(self):
try:
@ -517,6 +577,12 @@ class SegmentGetter(object):
Type may be:
full: Segment is complete. Hash is included.
suspect: Segment appears to be complete, but we suspect it is not. Hash is included.
This currently triggers on two conditions:
- If a segment takes a very long time to download, which we've observed to result in
partial files even though they appeared to end correctly.
- If the StreamWorker has encountered a time gap, then we suspect this segment to be
mis-timed. We have observed this where there is a small (~0.5s) time jump, then
all segments are consistently off by that amount compared to other nodes until refresh.
partial: Segment is incomplete. Hash is included.
temp: Segment has not been downloaded yet. A random uuid is added.
"""
@ -601,7 +667,9 @@ class SegmentGetter(object):
raise e
else:
request_duration = monotonic() - start_time
segment_type = "full" if request_duration < self.FETCH_SUSPECT_TIME else "suspect"
segment_type = "full"
if self.suspect or request_duration >= self.FETCH_SUSPECT_TIME:
segment_type = "suspect"
full_path = self.make_path(segment_type, hash)
self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path))
common.rename(temp_path, full_path)
@ -612,6 +680,20 @@ class SegmentGetter(object):
stat = latest_segment.labels(channel=self.channel, quality=self.quality)
timestamp = (self.date - datetime.datetime(1970, 1, 1)).total_seconds()
stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe
if self.history:
self.write_history(full_path)
def write_history(self, segment_path):
segment_path = os.path.relpath(segment_path, self.base_dir)
history_path = os.path.join(self.base_dir, "playlist-debug", segment_path)
try:
os.makedirs(history_path)
except FileExistsError:
pass
for n, (timestamp, playlist) in enumerate(self.history):
filename = "{}_{}".format(n, timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f"))
path = os.path.join(history_path, filename)
common.atomic_write(path, playlist)
def parse_channel(channel):
@ -630,7 +712,7 @@ def parse_channel(channel):
"This affects retry interval, error reporting and monitoring. "
"Non-twitch URLs can also be given with the form CHANNEL[!]:TYPE:URL"
)
def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, twitch_auth_file=None):
def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, twitch_auth_file=None, playlist_debug=0):
qualities = qualities.split(",") if qualities else []
twitch_auth_token = None
@ -651,7 +733,7 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
channel_qualities = ["source"]
else:
raise ValueError(f"Unknown type {type!r}")
manager = StreamsManager(provider, channel, base_dir, channel_qualities, important=important)
manager = StreamsManager(provider, channel, base_dir, channel_qualities, important=important, history_size=playlist_debug)
managers.append(manager)
def stop():

@ -29,7 +29,8 @@ class Provider:
session = InstrumentedSession()
resp = session.get(uri, metric_name='get_media_playlist')
resp.raise_for_status()
return hls_playlist.load(resp.text, base_uri=resp.url)
playlist = resp.text
return playlist, hls_playlist.load(playlist, base_uri=resp.url)
class URLProvider(Provider):

@ -1,9 +0,0 @@
FROM python:3.11
RUN pip install bokeh
COPY graphs /tmp/graphs
RUN pip install /tmp/graphs && rm -r /tmp/graphs
LABEL org.opencontainers.image.source https://github.com/dbvideostriketeam/wubloader
ENTRYPOINT ["python3", "-m", "graphs"]

@ -1,13 +0,0 @@
import gevent.monkey
gevent.monkey.patch_all()
import logging
import argh
from graphs.main import main
LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s"
logging.basicConfig(level='INFO', format=LOG_FORMAT)
argh.dispatch_command(main)

@ -1,180 +0,0 @@
import gevent.monkey
gevent.monkey.patch_all()
import datetime
import logging
import json
import os
import argh
import bokeh.plotting
import bokeh.models
import bokeh.palettes
import bokeh.settings
import numpy as np
import requests
def format_year(year):
if year > 10:
year += 2006
return 'DBfH {}'.format(year)
def parse_json(json_donations, start_date, end_hour=np.inf, every_five=True):
end_hour = float(end_hour)
times = []
donations = []
for entry in json_donations:
times.append(datetime.datetime(*entry[:5]).isoformat())
donations.append(entry[5])
times = np.array(times, dtype=np.datetime64)
donations = np.asarray(donations)
start_time = np.datetime64(start_date)
bustimes = np.array(times - start_time, dtype=np.int_)
trimmed_bustimes = bustimes[(bustimes <= 60 * 60 * end_hour) & (bustimes >= 0)]
trimmed_donations = donations[(bustimes <= 60 * 60 * end_hour) & (bustimes >= 0)]
if every_five:
five_bustimes = trimmed_bustimes[::5]
five_donations = trimmed_donations[::5]
return five_bustimes, five_donations
else:
return trimmed_bustimes, trimmed_donations
def load_previous_donations(start_end_times, donation_url_template, timeout):
all_years = {}
for year in start_end_times:
start, end = start_end_times[year]
if not end:
current_year = year
continue
url = donation_url_template.format(year, year)
logging.info('Loading {}'.format(url))
year_json = requests.get(url, timeout=timeout).json()
all_years[year] = parse_json(year_json, start, end, year >= 5)
return all_years, current_year
def all_years_donations_graph(start_end_times, all_years, current_year, current_json, base_dir):
logging.info('Generating all years donation graph')
p = bokeh.plotting.figure(x_axis_label='Bus Time', y_axis_label='Donations', x_range=(0, 60 * 60 * 172),
width=1280, height=720, active_scroll='wheel_zoom',
tools='pan,wheel_zoom,box_zoom,reset')
p.add_tools(bokeh.models.HoverTool(tooltips=[('', '$name'), ('Bustime', '@Bustime{00:00:00}'),
('Donations', '$@Donations{0,0.00}')]))
for year in start_end_times:
label = format_year(year)
if year != current_year:
times, donations = all_years[year]
line_width = 2
else:
times, donations = parse_json(current_json, start_end_times[year][0], every_five=False)
line_width = 3
model = bokeh.models.ColumnDataSource(data={'Bustime':times, 'Donations':donations})
p.line(x='Bustime', y='Donations', source=model, line_width=line_width,
line_color=bokeh.palettes.Category20[20][current_year - year],
legend_label=label, name=label)
p.xaxis.ticker = bokeh.models.AdaptiveTicker(mantissas=[60, 120, 300, 600, 1200, 3600, 7200, 10800, 43200, 86400], base=10000000)
p.xaxis.formatter = bokeh.models.NumeralTickFormatter(format='00:00:00')
p.yaxis.formatter = bokeh.models.NumeralTickFormatter(format='$0,0')
p.legend.location = "top_left"
p.legend.click_policy="hide"
output_path = os.path.join(base_dir, 'all_years_donations.html')
bokeh.plotting.output_file(filename=output_path, title='DBfH All Years Donations')
bokeh.plotting.save(p, filename=output_path)
logging.info('{} Saved'.format(output_path))
def shifts_graph(start_end_times, current_year, current_json, base_dir, shifts):
logging.info('Generating DBfH {} shifts graph'.format(current_year))
times, donations = parse_json(current_json, start_end_times[current_year][0], every_five=False)
start_hour = int(start_end_times[current_year][0][11:13])
hours = times / 3600 + start_hour
mod_hours = hours % 24
n_days = int(hours.max() / 24) + 1
p = bokeh.plotting.figure(x_axis_label='Hour of Day', y_axis_label='Donations', x_range=(0, 24 * 3600),
width=1280, height=720, active_scroll='wheel_zoom',
tools='pan,wheel_zoom,box_zoom,reset')
p.add_tools(bokeh.models.HoverTool(tooltips=[('', '$name'), ('Hour of Day', '@Hours{00:00:00}'),
('Donations', '$@Donations{0,0.00}')]))
for day in range(n_days):
for shift in shifts:
in_range = (hours >= day * 24 + shift[1]) & (hours <= day * 24 + shift[2])
hours_in_range = mod_hours[in_range]
if mod_hours[in_range].size:
if hours_in_range[-1] == 0.:
hours_in_range[-1] = 24
model = bokeh.models.ColumnDataSource(data={'Hours':hours_in_range * 3600, 'Donations':donations[in_range] - donations[in_range][0]})
p.line(x='Hours', y='Donations', source=model, line_color=bokeh.palettes.Category10[10][day],
line_width=2, legend_label='Day {}'.format(day + 1), name='Day {} {}'.format(day + 1, shift[0]))
p.xaxis.ticker = bokeh.models.AdaptiveTicker(mantissas=[60, 120, 300, 600, 1200, 3600, 7200, 10800, 43200, 86400], base=10000000)
p.xaxis.formatter = bokeh.models.NumeralTickFormatter(format='00:00:00')
p.yaxis.formatter = bokeh.models.NumeralTickFormatter(format='$0,0')
p.legend.location = "top_left"
p.legend.click_policy="hide"
output_path = os.path.join(base_dir, 'DBfH_{}_shifts_graph.html'.format(current_year))
bokeh.plotting.output_file(filename=output_path, title='{} Shift Donations'.format(format_year(current_year)))
bokeh.plotting.save(p, filename=output_path)
logging.info('{} Saved'.format(output_path))
@argh.arg('--base-dir', help='Directory where graphs are output. Default is current working directory.')
def main(donation_url_template, base_dir='.'):
stopping = gevent.event.Event()
logging.getLogger('bokeh').setLevel(logging.WARNING)
delay = 60 * 1
timeout = 15
shifts = [['Zeta Shift', 0, 6],
['Dawn Guard', 6, 12],
['Alpha Flight', 12, 18],
['Night Watch', 18, 24]]
# First load data required
logging.info('Loading start and end times')
start_end_path = os.path.join(base_dir, 'start_end_times.json')
start_end_times = json.load(open(start_end_path))
start_end_times = {int(year):start_end_times[year] for year in start_end_times}
all_years, current_year = load_previous_donations(start_end_times, donation_url_template, timeout)
current_url = donation_url_template.format(current_year, current_year)
while not stopping.is_set():
try:
logging.info('Loading {}'.format(current_url))
current_json = requests.get(current_url, timeout=timeout).json()
all_years_donations_graph(start_end_times, all_years, current_year, current_json, base_dir)
shifts_graph(start_end_times, current_year, current_json, base_dir, shifts)
except Exception:
logging.exception('Plotting failed. Retrying')
stopping.wait(delay)

@ -1,14 +0,0 @@
from setuptools import setup, find_packages
setup(
name = 'graphs',
version = '0.0.0',
packages = find_packages(),
install_requires = [
'argh',
'bokeh',
'gevent',
'numpy',
'requests'
],
)

@ -91,10 +91,10 @@ def metrics():
"""Return current metrics in prometheus metrics format"""
return prom.generate_latest()
# To make nginx proxying simpler, we want to allow /metrics/* to work
@app.route('/metrics/<trailing>')
# To make nginx proxying simpler, we want to allow /metrics/restreamer to work
@app.route('/metrics/restreamer')
@request_stats
def metrics_with_trailing(trailing):
def metrics_with_trailing():
"""Expose Prometheus metrics."""
return prom.generate_latest()

@ -13,6 +13,15 @@ from .config import get_config
cli = argh.EntryPoint()
def try_save_image(media_dir, attachment):
if media_dir is None:
return {"error": "no media dir given"}
try:
return {"path": media.download_media(attachment["url"], media_dir)}
except media.Rejected as e:
return {"error": str(e)}
def format_account(account):
return f"**[{account['display_name']}]({account['url']})**"
@ -167,11 +176,12 @@ LINE = "\n---"
class Listener(mastodon.StreamListener):
def __init__(self, zulip_client, stream, post_topic, notification_topic):
def __init__(self, zulip_client, stream, post_topic, notification_topic, output_path, media_dir):
self.zulip_client = zulip_client
self.stream = stream
self.post_topic = post_topic
self.notification_topic = notification_topic
self.output_path = output_path
def send(self, topic, content):
logging.info(f"Sending message to {self.stream}/{topic}: {content!r}")
@ -179,25 +189,50 @@ class Listener(mastodon.StreamListener):
def on_update(self, status):
logging.info(f"Got update: {status!r}")
self.output("update", status)
self.send(self.post_topic, format_status(status) + LINE)
def on_delete(self, status_id):
logging.info(f"Got delete: {status_id}")
self.output("delete", payload=status_id)
self.send(self.post_topic, f"*Status with id {status_id} was deleted*")
def on_status_update(self, status):
logging.info(f"Got status update: {status!r}")
self.output("status_update", status)
self.send(self.post_topic, f"*The following status has been updated*\n{format_status(status)}" + LINE)
def on_notification(self, notification):
logging.info(f"Got {notification['type']} notification: {notification!r}")
if notification["type"] != "mention":
return
self.output("mention", notification["status"])
self.send(self.notification_topic, format_status(notification["status"]) + LINE)
def output(self, type, status=None, payload=None):
if self.output_path is None:
return
data = {"type": type}
if status is not None:
data["status"] = status
data["attachments"] = [
try_save_image(self.media_dir, attachment)
for attachment in status["media_attachments"]
]
if payload is not None:
data["payload"] = payload
data = json.dumps(data)
with open(os.path.join(self.output_path, "messages.json"), "a") as f:
f.write(data + "\n")
@cli
def main(conf_file, stream="bot-spam", post_topic="Toots from Desert Bus", notification_topic="Mastodon Notifications"):
def main(
conf_file,
stream="bot-spam",
post_topic="Toots from Desert Bus",
notification_topic="Mastodon Notifications",
):
"""
Run the actual bot.
@ -211,6 +246,8 @@ def main(conf_file, stream="bot-spam", post_topic="Toots from Desert Bus", notif
client_id # only required for get-access-token
client_secret # only required for get-access-token
access_token # only required for main
output_path # optional
media_dir # optional
"""
logging.basicConfig(level='INFO')
@ -220,7 +257,7 @@ def main(conf_file, stream="bot-spam", post_topic="Toots from Desert Bus", notif
zulip_client = zulip.Client(zc["url"], zc["email"], zc["api_key"])
mastodon_client = mastodon.Mastodon(api_base_url=mc["url"], access_token=mc["access_token"])
listener = Listener(zulip_client, stream, post_topic, notification_topic)
listener = Listener(zulip_client, stream, post_topic, notification_topic, conf.get("output_path"), conf.get("media_dir"))
RETRY_INTERVAL = 1

Loading…
Cancel
Save