Compare commits

..

3 Commits

Author SHA1 Message Date
Mike Lang c0dabdbc31 encode-worker: fix not checking if a row is done (dest_hash not null)
in both clearing and claiming code
3 months ago
Mike Lang a45bde07e7 encode worker: Add limit option and quit after current job on SIGTERM 3 months ago
Mike Lang 6c25acfaec encode worker 4 months ago

@ -7,7 +7,7 @@ RUN apk --update add py3-pip g++ python3-dev libffi-dev musl-dev file make busyb
RUN pip install --upgrade pip wheel
# Install gevent so that we don't need to re-install it when common changes
RUN pip install gevent
RUN pip install gevent==22.10.2
# Install common lib first as it changes less
COPY common /tmp/common

@ -16,7 +16,6 @@ import argh
import gevent.backdoor
import gevent.pool
import prometheus_client as prom
from requests.adapters import HTTPAdapter
import common
from common import dateutil
@ -24,10 +23,8 @@ from common import database
from common.requests import InstrumentedSession
from common.segments import list_segment_files, unpadded_b64_decode
# Wraps all requests in some metric collection and connection pooling
# Wraps all requests in some metric collection
requests = InstrumentedSession()
adapter = HTTPAdapter(pool_maxsize=100)
requests.mount('https://', adapter)
segments_backfilled = prom.Counter(
'segments_backfilled',
@ -47,12 +44,6 @@ hash_mismatches = prom.Counter(
['remote', 'channel', 'quality', 'hour'],
)
small_difference_segments = prom.Gauge(
'small_difference_segments',
'Number of segments which were not pulled due to differing from existing segments by only a very small time difference',
['remote', 'channel', 'quality', 'hour'],
)
node_list_errors = prom.Counter(
'node_list_errors',
'Number of errors fetching a list of nodes',
@ -513,20 +504,8 @@ class BackfillerWorker(object):
# multiple workers request the same segment at the same time
random.shuffle(missing_segments)
if quality != 'chat':
MATCH_FIELDS = ("channel", "quality", "duration", "type", "hash")
EPSILON = 0.001
local_infos = []
for path in local_segments:
path = os.path.join(channel, quality, hour, path)
try:
local_infos.append(common.parse_segment_path(path))
except ValueError as e:
self.logger.warning('Local file {} could not be parsed: {}'.format(path, e))
pool = gevent.pool.Pool(self.download_concurrency)
workers = []
small_differences = 0
for missing_segment in missing_segments:
@ -564,29 +543,12 @@ class BackfillerWorker(object):
self.logger.debug('Skipping {} as too recent'.format(path))
continue
# if any local segment is within 1ms of the missing segment and otherwise identical, ignore it
found = None
for local_segment in local_infos:
# if any fields differ, no match
if not all(getattr(segment, field) == getattr(local_segment, field) for field in MATCH_FIELDS):
continue
# if time difference > epsilon, no match
if abs((segment.start - local_segment.start).total_seconds()) > EPSILON:
continue
found = local_segment
break
if found is not None:
self.logger.debug(f'Skipping {path} as within {EPSILON}s of identical segment {found.path}')
continue
# start segment as soon as a pool slot opens up, then track it in workers
workers.append(pool.spawn(
get_remote_segment,
self.base_dir, self.node, channel, quality, hour, missing_segment, self.logger
))
small_difference_segments.labels(self.node, channel, quality, hour).set(small_differences)
# verify that all the workers succeeded. if any failed, raise the exception from
# one of them arbitrarily.
for worker in workers:

@ -39,6 +39,7 @@
prizebot: false,
youtubebot: false,
bus_analyzer: false,
graphs: false,
},
// Twitch channels to capture. The first one will be used as the default channel in the editor.
@ -213,13 +214,11 @@
backfill_dirs:: ["emotes"],
// Enable saving of media (images and videos - this can be large), either globally or split into
// options:
// three options:
// - From chat messages (in chat_archiver.download_media)
// - From toots
// - From the image links column in the sheet (using sheetsync)
// - Backfilled from other nodes
download_media:: true,
download_toot_media:: $.download_media,
backfill_media:: $.download_media,
download_sheet_links:: $.download_media,
@ -330,8 +329,6 @@
// Obtain an access token by running: python -m zulip_bots.tootbot get-access-token
access_token: "",
},
[if $.download_toot_media then "media_dir"]: "/mnt/media",
output_path: "/mnt/tootbot.json"
args:: [],
},
@ -371,6 +368,9 @@
channel_id: "UCz5-PNQxaT4WtB_OMAwD85g", // DesertBusForHope
},
// template for donation data urls
donation_url_template:: "https://example.com/DB{}/DB{}.json",
// Extra options to pass via environment variables,
// eg. log level, disabling stack sampling.
env:: {
@ -756,7 +756,15 @@
environment: $.env,
},
local bot_service(name, config, args=[], subcommand=null, mount_segments=false) = {
[if $.enabled.graphs then "graphs"]: {
image: $.get_image("graphs"),
restart: "always",
command: [$.donation_url_template, "--base-dir", "/mnt/graphs"],
volumes: ["%s:/mnt" % $.segments_path],
environment: $.env,
},
local bot_service(name, config, args=[], subcommand=null) = {
image: $.get_image("zulip_bots"),
restart: "always",
entrypoint: ["python3", "-m", "zulip_bots.%s" % name]
@ -764,7 +772,6 @@
+ [std.manifestJson(config)]
+ args,
environment: $.env,
[if mount_segments then "volumes"]: ["%s:/mnt" % $.segments_path],
},
[if $.enabled.schedulebot then "schedulebot"]:
@ -780,7 +787,7 @@
[if $.enabled.tootbot then "tootbot"]:
bot_service("tootbot", $.tootbot + {
zulip+: { url: $.zulip_url },
}, $.tootbot.args, subcommand="main", mount_segments=true),
}, $.tootbot.args, subcommand="main"),
[if $.enabled.twitchbot then "twitchbot"]:
bot_service("twitchbot", $.twitchbot + {
@ -790,12 +797,16 @@
[if $.enabled.pubbot then "pubbot"]:
bot_service("pubbot", $.pubbot + {
zulip_url: $.zulip_url,
}, ["/mnt/pubnub-log.json"], mount_segments=true),
}, ["/mnt/pubnub-log.json"]) + {
volumes: ["%s:/mnt" % $.segments_path],
},
[if $.enabled.blogbot then "blogbot"]:
bot_service("blogbot", $.blogbot + {
zulip_url: $.zulip_url,
}, ["--save-dir", "/mnt/blogs"], mount_segments=true),
}, ["--save-dir", "/mnt/blogs"]) + {
volumes: ["%s:/mnt" % $.segments_path],
},
[if $.enabled.prizebot then "prizebot"]:
bot_service("prizebot", $.prizebot + {

@ -15,7 +15,6 @@ import gevent.backdoor
import gevent.event
import prometheus_client as prom
import requests
import requests.adapters
from monotonic import monotonic
import common
@ -50,24 +49,6 @@ ad_segments_ignored = prom.Counter(
["channel", "quality"],
)
suspicious_skew_count = prom.Counter(
"suspicious_skew_count",
"Number of times we've restarted a worker due to suspicious skew",
["channel", "quality"],
)
segment_time_skew_non_zero_sum = prom.Gauge(
"segment_time_skew_non_zero_sum",
"Sum of all observed segment skew amounts for worker",
["channel", "quality", "worker"],
)
segment_time_skew_non_zero_count = prom.Counter(
"segment_time_skew_non_zero_count",
"Count of segments with non-zero skew for worker",
["channel", "quality", "worker"],
)
class TimedOutError(Exception):
pass
@ -141,7 +122,7 @@ class StreamsManager(object):
FETCH_TIMEOUTS = 5, 30
def __init__(self, provider, channel, base_dir, qualities, important=False, history_size=0):
def __init__(self, provider, channel, base_dir, qualities, important=False):
self.provider = provider
self.channel = channel
self.logger = logging.getLogger("StreamsManager({})".format(channel))
@ -222,7 +203,7 @@ class StreamsManager(object):
self.logger.info("Ignoring worker start as we are stopping")
return
url_time, url = self.latest_urls[quality]
worker = StreamWorker(self, quality, url, url_time, self.history_size)
worker = StreamWorker(self, quality, url, url_time)
self.stream_workers[quality].append(worker)
gevent.spawn(worker.run)
@ -309,12 +290,7 @@ class StreamWorker(object):
FETCH_RETRY_INTERVAL = 1
FETCH_POLL_INTERVAL = 2
# Max difference between a segment's time + duration and the next segment's time (in seconds)
# before we consider this "suspicious" and trigger a refresh.
# See https://github.com/dbvideostriketeam/wubloader/issues/539
MAX_SEGMENT_TIME_SKEW = 0.01
def __init__(self, manager, quality, url, url_time, history_size):
def __init__(self, manager, quality, url, url_time):
self.manager = manager
self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(quality, id(self)))
self.quality = quality
@ -329,16 +305,11 @@ class StreamWorker(object):
# This worker's SegmentGetters will use its session by default for performance,
# but will fall back to a new one if something goes wrong.
self.session = common.requests.InstrumentedSession()
adapter = requests.adapters.HTTPAdapter(pool_maxsize=100)
self.session.mount('https://', adapter)
# Map cache is a simple cache to avoid re-downloading the same map URI for every segment,
# since it's generally the same but may occasionally change.
# We expect the map data to be very small so there is no eviction here.
# {uri: data}
self.map_cache = {}
# If enabled, playlist history is saved after each segment fetch,
# showing the last N playlist fetches up until the one that resulted in that fetch.
self.history_size = history_size
def __repr__(self):
return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.quality)
@ -376,15 +347,12 @@ class StreamWorker(object):
def _run(self):
first = True
suspicious_skew = False
history = []
while not self.stopping.is_set():
self.logger.debug("Getting media playlist {}".format(self.url))
try:
with soft_hard_timeout(self.logger, "getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker):
playlist_time = datetime.datetime.utcnow()
raw_playlist, playlist = self.manager.provider.get_media_playlist(self.url, session=self.session)
playlist = self.manager.provider.get_media_playlist(self.url, session=self.session)
except Exception as e:
self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True)
self.trigger_new_worker()
@ -400,12 +368,8 @@ class StreamWorker(object):
# We successfully got the playlist at least once
first = False
if self.history_size > 0:
history = [(playlist_time, raw_playlist)] + history[:self.history_size]
# Start any new segment getters
date = None # tracks date in case some segment doesn't include it
prev_segment = None
for segment in playlist.segments:
if segment.ad_reason:
self.logger.info("Ignoring ad segment: {}".format(segment.ad_reason))
@ -416,26 +380,7 @@ class StreamWorker(object):
self.manager.mark_working(self)
if segment.date:
new_date = common.dateutil.parse(segment.date)
if date is not None:
# We have observed an issue on twitch where there will be a small time jump
# (eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7)
# and all subsequent segment timestamps will be out by this amount compared
# to what other downloader instances see. Our workaround for this issue is to
# watch for such gaps and:
# 1. trigger a worker refresh (which seems to fix the issue)
# 2. treat all subsequent segments as "suspect" so they are still saved
# but only used if no other source is available.
skew = (date - new_date).total_seconds()
if skew != 0:
segment_time_skew_non_zero_sum.labels(self.manager.channel, self.quality, f"{id(self):x}").inc(skew)
segment_time_skew_non_zero_count.labels(self.manager.channel, self.quality, f"{id(self):x}").inc()
if abs(skew) > self.MAX_SEGMENT_TIME_SKEW and not suspicious_skew:
self.logger.warning(f"Suspicious skew of {skew}, triggering new worker: Expected {date} after {prev_segment}, got {new_date} for {segment}")
self.trigger_new_worker()
suspicious_skew = True
suspicious_skew_count.labels(self.manager.channel, self.quality).inc()
date = new_date
date = common.dateutil.parse(segment.date)
if segment.uri not in self.getters:
if date is None:
raise ValueError("Cannot determine date of segment")
@ -447,14 +392,11 @@ class StreamWorker(object):
self.quality,
segment,
date,
suspicious_skew,
self.map_cache,
history,
)
gevent.spawn(self.getters[segment.uri].run)
if date is not None:
date += datetime.timedelta(seconds=segment.duration)
prev_segment = segment
# Clean up any old segment getters.
# Note use of list() to make a copy to avoid modification-during-iteration
@ -506,21 +448,19 @@ class SegmentGetter(object):
# or so, to be paranoid we set it to considerably longer than that.
GIVE_UP_TIMEOUT = 20 * 60
def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, suspect, map_cache, history):
def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, map_cache):
self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self)))
self.base_dir = base_dir
self.channel = channel
self.quality = quality
self.segment = segment
self.date = date
self.suspect = suspect
self.map_cache = map_cache
self.prefix = self.make_path_prefix()
self.retry = None # Event, set to begin retrying
self.done = gevent.event.Event() # set when file exists or we give up
# Our parent's connection pool, but we'll replace it if there's any issues
self.session = session
self.history = history
def run(self):
try:
@ -577,12 +517,6 @@ class SegmentGetter(object):
Type may be:
full: Segment is complete. Hash is included.
suspect: Segment appears to be complete, but we suspect it is not. Hash is included.
This currently triggers on two conditions:
- If a segment takes a very long time to download, which we've observed to result in
partial files even though they appeared to end correctly.
- If the StreamWorker has encountered a time gap, then we suspect this segment to be
mis-timed. We have observed this where there is a small (~0.5s) time jump, then
all segments are consistently off by that amount compared to other nodes until refresh.
partial: Segment is incomplete. Hash is included.
temp: Segment has not been downloaded yet. A random uuid is added.
"""
@ -667,9 +601,7 @@ class SegmentGetter(object):
raise e
else:
request_duration = monotonic() - start_time
segment_type = "full"
if self.suspect or request_duration >= self.FETCH_SUSPECT_TIME:
segment_type = "suspect"
segment_type = "full" if request_duration < self.FETCH_SUSPECT_TIME else "suspect"
full_path = self.make_path(segment_type, hash)
self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path))
common.rename(temp_path, full_path)
@ -680,20 +612,6 @@ class SegmentGetter(object):
stat = latest_segment.labels(channel=self.channel, quality=self.quality)
timestamp = (self.date - datetime.datetime(1970, 1, 1)).total_seconds()
stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe
if self.history:
self.write_history(full_path)
def write_history(self, segment_path):
segment_path = os.path.relpath(segment_path, self.base_dir)
history_path = os.path.join(self.base_dir, "playlist-debug", segment_path)
try:
os.makedirs(history_path)
except FileExistsError:
pass
for n, (timestamp, playlist) in enumerate(self.history):
filename = "{}_{}".format(n, timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f"))
path = os.path.join(history_path, filename)
common.atomic_write(path, playlist)
def parse_channel(channel):
@ -712,7 +630,7 @@ def parse_channel(channel):
"This affects retry interval, error reporting and monitoring. "
"Non-twitch URLs can also be given with the form CHANNEL[!]:TYPE:URL"
)
def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, twitch_auth_file=None, playlist_debug=0):
def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, twitch_auth_file=None):
qualities = qualities.split(",") if qualities else []
twitch_auth_token = None
@ -733,7 +651,7 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
channel_qualities = ["source"]
else:
raise ValueError(f"Unknown type {type!r}")
manager = StreamsManager(provider, channel, base_dir, channel_qualities, important=important, history_size=playlist_debug)
manager = StreamsManager(provider, channel, base_dir, channel_qualities, important=important)
managers.append(manager)
def stop():

@ -29,8 +29,7 @@ class Provider:
session = InstrumentedSession()
resp = session.get(uri, metric_name='get_media_playlist')
resp.raise_for_status()
playlist = resp.text
return playlist, hls_playlist.load(playlist, base_uri=resp.url)
return hls_playlist.load(resp.text, base_uri=resp.url)
class URLProvider(Provider):
@ -140,7 +139,6 @@ class TwitchProvider(Provider):
"allow_audio_only": "true",
"allow_spectre": "false",
"fast_bread": "true",
"supported_codecs": "av1,h265,h264",
"sig": sig,
"token": token,
},

@ -0,0 +1,9 @@
FROM python:3.11
RUN pip install bokeh
COPY graphs /tmp/graphs
RUN pip install /tmp/graphs && rm -r /tmp/graphs
LABEL org.opencontainers.image.source https://github.com/dbvideostriketeam/wubloader
ENTRYPOINT ["python3", "-m", "graphs"]

@ -0,0 +1,13 @@
import gevent.monkey
gevent.monkey.patch_all()
import logging
import argh
from graphs.main import main
LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s"
logging.basicConfig(level='INFO', format=LOG_FORMAT)
argh.dispatch_command(main)

@ -0,0 +1,180 @@
import gevent.monkey
gevent.monkey.patch_all()
import datetime
import logging
import json
import os
import argh
import bokeh.plotting
import bokeh.models
import bokeh.palettes
import bokeh.settings
import numpy as np
import requests
def format_year(year):
if year > 10:
year += 2006
return 'DBfH {}'.format(year)
def parse_json(json_donations, start_date, end_hour=np.inf, every_five=True):
end_hour = float(end_hour)
times = []
donations = []
for entry in json_donations:
times.append(datetime.datetime(*entry[:5]).isoformat())
donations.append(entry[5])
times = np.array(times, dtype=np.datetime64)
donations = np.asarray(donations)
start_time = np.datetime64(start_date)
bustimes = np.array(times - start_time, dtype=np.int_)
trimmed_bustimes = bustimes[(bustimes <= 60 * 60 * end_hour) & (bustimes >= 0)]
trimmed_donations = donations[(bustimes <= 60 * 60 * end_hour) & (bustimes >= 0)]
if every_five:
five_bustimes = trimmed_bustimes[::5]
five_donations = trimmed_donations[::5]
return five_bustimes, five_donations
else:
return trimmed_bustimes, trimmed_donations
def load_previous_donations(start_end_times, donation_url_template, timeout):
all_years = {}
for year in start_end_times:
start, end = start_end_times[year]
if not end:
current_year = year
continue
url = donation_url_template.format(year, year)
logging.info('Loading {}'.format(url))
year_json = requests.get(url, timeout=timeout).json()
all_years[year] = parse_json(year_json, start, end, year >= 5)
return all_years, current_year
def all_years_donations_graph(start_end_times, all_years, current_year, current_json, base_dir):
logging.info('Generating all years donation graph')
p = bokeh.plotting.figure(x_axis_label='Bus Time', y_axis_label='Donations', x_range=(0, 60 * 60 * 172),
width=1280, height=720, active_scroll='wheel_zoom',
tools='pan,wheel_zoom,box_zoom,reset')
p.add_tools(bokeh.models.HoverTool(tooltips=[('', '$name'), ('Bustime', '@Bustime{00:00:00}'),
('Donations', '$@Donations{0,0.00}')]))
for year in start_end_times:
label = format_year(year)
if year != current_year:
times, donations = all_years[year]
line_width = 2
else:
times, donations = parse_json(current_json, start_end_times[year][0], every_five=False)
line_width = 3
model = bokeh.models.ColumnDataSource(data={'Bustime':times, 'Donations':donations})
p.line(x='Bustime', y='Donations', source=model, line_width=line_width,
line_color=bokeh.palettes.Category20[20][current_year - year],
legend_label=label, name=label)
p.xaxis.ticker = bokeh.models.AdaptiveTicker(mantissas=[60, 120, 300, 600, 1200, 3600, 7200, 10800, 43200, 86400], base=10000000)
p.xaxis.formatter = bokeh.models.NumeralTickFormatter(format='00:00:00')
p.yaxis.formatter = bokeh.models.NumeralTickFormatter(format='$0,0')
p.legend.location = "top_left"
p.legend.click_policy="hide"
output_path = os.path.join(base_dir, 'all_years_donations.html')
bokeh.plotting.output_file(filename=output_path, title='DBfH All Years Donations')
bokeh.plotting.save(p, filename=output_path)
logging.info('{} Saved'.format(output_path))
def shifts_graph(start_end_times, current_year, current_json, base_dir, shifts):
logging.info('Generating DBfH {} shifts graph'.format(current_year))
times, donations = parse_json(current_json, start_end_times[current_year][0], every_five=False)
start_hour = int(start_end_times[current_year][0][11:13])
hours = times / 3600 + start_hour
mod_hours = hours % 24
n_days = int(hours.max() / 24) + 1
p = bokeh.plotting.figure(x_axis_label='Hour of Day', y_axis_label='Donations', x_range=(0, 24 * 3600),
width=1280, height=720, active_scroll='wheel_zoom',
tools='pan,wheel_zoom,box_zoom,reset')
p.add_tools(bokeh.models.HoverTool(tooltips=[('', '$name'), ('Hour of Day', '@Hours{00:00:00}'),
('Donations', '$@Donations{0,0.00}')]))
for day in range(n_days):
for shift in shifts:
in_range = (hours >= day * 24 + shift[1]) & (hours <= day * 24 + shift[2])
hours_in_range = mod_hours[in_range]
if mod_hours[in_range].size:
if hours_in_range[-1] == 0.:
hours_in_range[-1] = 24
model = bokeh.models.ColumnDataSource(data={'Hours':hours_in_range * 3600, 'Donations':donations[in_range] - donations[in_range][0]})
p.line(x='Hours', y='Donations', source=model, line_color=bokeh.palettes.Category10[10][day],
line_width=2, legend_label='Day {}'.format(day + 1), name='Day {} {}'.format(day + 1, shift[0]))
p.xaxis.ticker = bokeh.models.AdaptiveTicker(mantissas=[60, 120, 300, 600, 1200, 3600, 7200, 10800, 43200, 86400], base=10000000)
p.xaxis.formatter = bokeh.models.NumeralTickFormatter(format='00:00:00')
p.yaxis.formatter = bokeh.models.NumeralTickFormatter(format='$0,0')
p.legend.location = "top_left"
p.legend.click_policy="hide"
output_path = os.path.join(base_dir, 'DBfH_{}_shifts_graph.html'.format(current_year))
bokeh.plotting.output_file(filename=output_path, title='{} Shift Donations'.format(format_year(current_year)))
bokeh.plotting.save(p, filename=output_path)
logging.info('{} Saved'.format(output_path))
@argh.arg('--base-dir', help='Directory where graphs are output. Default is current working directory.')
def main(donation_url_template, base_dir='.'):
stopping = gevent.event.Event()
logging.getLogger('bokeh').setLevel(logging.WARNING)
delay = 60 * 1
timeout = 15
shifts = [['Zeta Shift', 0, 6],
['Dawn Guard', 6, 12],
['Alpha Flight', 12, 18],
['Night Watch', 18, 24]]
# First load data required
logging.info('Loading start and end times')
start_end_path = os.path.join(base_dir, 'start_end_times.json')
start_end_times = json.load(open(start_end_path))
start_end_times = {int(year):start_end_times[year] for year in start_end_times}
all_years, current_year = load_previous_donations(start_end_times, donation_url_template, timeout)
current_url = donation_url_template.format(current_year, current_year)
while not stopping.is_set():
try:
logging.info('Loading {}'.format(current_url))
current_json = requests.get(current_url, timeout=timeout).json()
all_years_donations_graph(start_end_times, all_years, current_year, current_json, base_dir)
shifts_graph(start_end_times, current_year, current_json, base_dir, shifts)
except Exception:
logging.exception('Plotting failed. Retrying')
stopping.wait(delay)

@ -0,0 +1,14 @@
from setuptools import setup, find_packages
setup(
name = 'graphs',
version = '0.0.0',
packages = find_packages(),
install_requires = [
'argh',
'bokeh',
'gevent',
'numpy',
'requests'
],
)

@ -91,10 +91,10 @@ def metrics():
"""Return current metrics in prometheus metrics format"""
return prom.generate_latest()
# To make nginx proxying simpler, we want to allow /metrics/restreamer to work
@app.route('/metrics/restreamer')
# To make nginx proxying simpler, we want to allow /metrics/* to work
@app.route('/metrics/<trailing>')
@request_stats
def metrics_with_trailing():
def metrics_with_trailing(trailing):
"""Expose Prometheus metrics."""
return prom.generate_latest()

@ -157,7 +157,7 @@ def main(conf_file, message_log_file, name=socket.gethostname()):
logging.info("New donation total: {}{}{}".format(msg["d"], increase_str, entries_str))
total = msg["d"]
if increase is not None and increase > 0:
client.send_to_stream("firehose", "Donations", "Donation total is now ${:.2f}{}{}".format(msg["d"], increase_str, entries_str))
client.send_to_stream("bot-spam", "Donation Firehose", "Donation total is now ${:.2f}{}{}".format(msg["d"], increase_str, entries_str))
if increase is not None and increase >= 500:
client.send_to_stream("bot-spam", "Notable Donations", "Large donation of ${:.2f} (total ${:.2f}){}".format(increase, msg['d'], entries_str))

@ -13,15 +13,6 @@ from .config import get_config
cli = argh.EntryPoint()
def try_save_image(media_dir, attachment):
if media_dir is None:
return {"error": "no media dir given"}
try:
return {"path": media.download_media(attachment["url"], media_dir)}
except media.Rejected as e:
return {"error": str(e)}
def format_account(account):
return f"**[{account['display_name']}]({account['url']})**"
@ -176,12 +167,11 @@ LINE = "\n---"
class Listener(mastodon.StreamListener):
def __init__(self, zulip_client, stream, post_topic, notification_topic, output_path, media_dir):
def __init__(self, zulip_client, stream, post_topic, notification_topic):
self.zulip_client = zulip_client
self.stream = stream
self.post_topic = post_topic
self.notification_topic = notification_topic
self.output_path = output_path
def send(self, topic, content):
logging.info(f"Sending message to {self.stream}/{topic}: {content!r}")
@ -189,50 +179,25 @@ class Listener(mastodon.StreamListener):
def on_update(self, status):
logging.info(f"Got update: {status!r}")
self.output("update", status)
self.send(self.post_topic, format_status(status) + LINE)
def on_delete(self, status_id):
logging.info(f"Got delete: {status_id}")
self.output("delete", payload=status_id)
self.send(self.post_topic, f"*Status with id {status_id} was deleted*")
def on_status_update(self, status):
logging.info(f"Got status update: {status!r}")
self.output("status_update", status)
self.send(self.post_topic, f"*The following status has been updated*\n{format_status(status)}" + LINE)
def on_notification(self, notification):
logging.info(f"Got {notification['type']} notification: {notification!r}")
if notification["type"] != "mention":
return
self.output("mention", notification["status"])
self.send(self.notification_topic, format_status(notification["status"]) + LINE)
def output(self, type, status=None, payload=None):
if self.output_path is None:
return
data = {"type": type}
if status is not None:
data["status"] = status
data["attachments"] = [
try_save_image(self.media_dir, attachment)
for attachment in status["media_attachments"]
]
if payload is not None:
data["payload"] = payload
data = json.dumps(data)
with open(os.path.join(self.output_path, "messages.json"), "a") as f:
f.write(data + "\n")
@cli
def main(
conf_file,
stream="bot-spam",
post_topic="Toots from Desert Bus",
notification_topic="Mastodon Notifications",
):
def main(conf_file, stream="bot-spam", post_topic="Toots from Desert Bus", notification_topic="Mastodon Notifications"):
"""
Run the actual bot.
@ -246,8 +211,6 @@ def main(
client_id # only required for get-access-token
client_secret # only required for get-access-token
access_token # only required for main
output_path # optional
media_dir # optional
"""
logging.basicConfig(level='INFO')
@ -257,7 +220,7 @@ def main(
zulip_client = zulip.Client(zc["url"], zc["email"], zc["api_key"])
mastodon_client = mastodon.Mastodon(api_base_url=mc["url"], access_token=mc["access_token"])
listener = Listener(zulip_client, stream, post_topic, notification_topic, conf.get("output_path"), conf.get("media_dir"))
listener = Listener(zulip_client, stream, post_topic, notification_topic)
RETRY_INTERVAL = 1

@ -11,30 +11,6 @@ from common.googleapis import GoogleAPIClient
from .config import get_config
from .zulip import Client
_video_title_cache = {}
def get_video_title(google, video_id):
if video_id not in _video_title_cache:
_video_title_cache[video_id] = _get_video_title(google, video_id)
return _video_title_cache[video_id]
def _get_video_title(google, video_id):
resp = google.request("GET",
"https://www.googleapis.com/youtube/v3/videos",
params={
"part": "snippet",
"id": video_id,
}
)
resp.raise_for_status()
items = resp.json()["items"]
if not items:
# This should be rare, and it's ok to just crash
raise Exception(f"Request for video id {video_id} returned no results, was it deleted?")
return items[0]["snippet"]["title"]
def get_comments(google, channel_id):
resp = google.request("GET",
"https://www.googleapis.com/youtube/v3/commentThreads",
@ -63,12 +39,10 @@ def get_comments(google, channel_id):
return comments
def show_comment(zulip, google, stream, topic, comment):
title = get_video_title(google, comment["videoId"])
def show_comment(zulip, stream, topic, comment):
c = comment["snippet"]
author = f"[{c['authorDisplayName']}]({c['authorChannelUrl']})"
url = f"https://youtu.be/{comment['videoId']}"
video = f"[{title}]({url})"
video = f"https://youtu.be/{comment['videoId']}"
message = f"{author} commented on {video}:\n```quote\n{c['textDisplay']}\n```"
logging.info(f"Sending message to {stream}/{topic}: {message!r}")
# Empty stream acts as a dry-run mode
@ -118,7 +92,7 @@ def main(conf_file, interval=60, one_off=0, stream="bot-spam", topic="Youtube Co
if comment["id"] in seen:
logging.debug(f"Comment {comment['id']} already seen, skipping")
continue
show_comment(zulip, google, stream, topic, comment)
show_comment(zulip, stream, topic, comment)
seen.append(comment["id"])
seen = seen[-keep:]

Loading…
Cancel
Save