Compare commits

...

13 Commits

Author SHA1 Message Date
Mike Lang 60f17f2001 encode-worker: Script and database table for managing mass encodes
- create table `encodes` that contains info on encode jobs to run
- create encoder user that can modify only the designated columns of that table
- bash script that runs in a loop, taking and performing jobs from that table
3 days ago
Mike Lang 06d5835912 remove graphs service
We've decided not to use this in its current form.

Closes #545
3 days ago
Mike Lang d1d0b1bad2 tootbot: Save messages and images to disk 3 days ago
Mike Lang 270c38d50d docker-compose: add mount_segments option for bot_service() 3 days ago
Mike Lang 7b1c8c6d20 optionally enabled debugging: Write out last 10 media playlists after fetching each segment 3 days ago
Mike Lang 065c72f71f downloader: Re-connect when we see a time error over 0.01s
We have observed an issue on twitch where there will be a small time jump
(eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7)
and all subsequent segment timestamps will be out by this amount compared
to what other downloader instances see. Our workaround for this issue is to
watch for such gaps and:
1. trigger a worker refresh (which seems to fix the issue)
2. treat all subsequent segments as "suspect" so they are still saved
but only used if no other source is available.
3 days ago
Mike Lang c16d052f79 backfiller: Don't backfill segments that only differ from existing segment by 1ms
In the wild we've seen different servers get timestamps that differ by 1ms for segments
that are otherwise identical - same content, same duration.

The allowable fudge factor for segments is already 10ms, so having timing be 1ms different
between servers shouldn't cause any problems.
Worst case, there's a slight chance you'll get an adjacent frame when picking a cut point / thumbnail.
3 days ago
Mike Lang 5c524717f2 Fix connection pool warnings by increasing pool size
in backfiller and downloader, the things making lots of outgoing http requests.

We want these larger sizes anyway to improve performance in downloader and backfiller.
3 days ago
Mike Lang d5fbcd5790 Don't have restreamer respond with metrics to non-restreamer metrics requests 3 days ago
Mike Lang d7facca842 pubbot: Change donation firehose stream 2 months ago
Mike Lang a6a2ca1a96 fix api_ping as gevent now somehow fails with this pinned version
```
Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/usr/lib/python3.11/site-packages/api_ping/__main__.py", line 3, in <module>
    gevent.monkey.patch_all()
  File "/usr/lib/python3.11/site-packages/gevent/monkey.py", line 1255, in patch_all
    _notify_patch(events.GeventWillPatchAllEvent(modules_to_patch, kwargs), _warnings)
  File "/usr/lib/python3.11/site-packages/gevent/monkey.py", line 190, in _notify_patch
    notify_and_call_entry_points(event)
  File "/usr/lib/python3.11/site-packages/gevent/events.py", line 104, in notify_and_call_entry_points
    subscriber = plugin.load()
                 ^^^^^^^^^^^^^
  File "/usr/lib/python3.11/site-packages/pkg_resources/__init__.py", line 2745, in load
    self.require(*args, **kwargs)  # type: ignore
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/site-packages/pkg_resources/__init__.py", line 2773, in require
    items = working_set.resolve(reqs, env, installer, extras=self.extras)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/site-packages/pkg_resources/__init__.py", line 889, in resolve
    dist = self._resolve_dist(
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/site-packages/pkg_resources/__init__.py", line 930, in _resolve_dist
    raise DistributionNotFound(req, requirers)
pkg_resources.DistributionNotFound: The 'zope.event' distribution was not found and is required by the application
```

god i hate python packaging.
3 months ago
Mike Lang 58f85b30ac youtubebot: Show the video title
This is an additional 1 quota per video, but with a cache (it's ok if it's out of date)
this should be no issue.
3 months ago
Mike Lang 094edfc701 downloader: support 2k/4k twitch streams
You need to explicitly say you support x265
4 months ago

@ -7,7 +7,7 @@ RUN apk --update add py3-pip g++ python3-dev libffi-dev musl-dev file make busyb
RUN pip install --upgrade pip wheel RUN pip install --upgrade pip wheel
# Install gevent so that we don't need to re-install it when common changes # Install gevent so that we don't need to re-install it when common changes
RUN pip install gevent==22.10.2 RUN pip install gevent
# Install common lib first as it changes less # Install common lib first as it changes less
COPY common /tmp/common COPY common /tmp/common

@ -16,6 +16,7 @@ import argh
import gevent.backdoor import gevent.backdoor
import gevent.pool import gevent.pool
import prometheus_client as prom import prometheus_client as prom
from requests.adapters import HTTPAdapter
import common import common
from common import dateutil from common import dateutil
@ -23,8 +24,10 @@ from common import database
from common.requests import InstrumentedSession from common.requests import InstrumentedSession
from common.segments import list_segment_files, unpadded_b64_decode from common.segments import list_segment_files, unpadded_b64_decode
# Wraps all requests in some metric collection # Wraps all requests in some metric collection and connection pooling
requests = InstrumentedSession() requests = InstrumentedSession()
adapter = HTTPAdapter(pool_maxsize=100)
requests.mount('https://', adapter)
segments_backfilled = prom.Counter( segments_backfilled = prom.Counter(
'segments_backfilled', 'segments_backfilled',
@ -44,6 +47,12 @@ hash_mismatches = prom.Counter(
['remote', 'channel', 'quality', 'hour'], ['remote', 'channel', 'quality', 'hour'],
) )
small_difference_segments = prom.Gauge(
'small_difference_segments',
'Number of segments which were not pulled due to differing from existing segments by only a very small time difference',
['remote', 'channel', 'quality', 'hour'],
)
node_list_errors = prom.Counter( node_list_errors = prom.Counter(
'node_list_errors', 'node_list_errors',
'Number of errors fetching a list of nodes', 'Number of errors fetching a list of nodes',
@ -504,9 +513,21 @@ class BackfillerWorker(object):
# multiple workers request the same segment at the same time # multiple workers request the same segment at the same time
random.shuffle(missing_segments) random.shuffle(missing_segments)
if quality != 'chat':
MATCH_FIELDS = ("channel", "quality", "duration", "type", "hash")
EPSILON = 0.001
local_infos = []
for path in local_segments:
path = os.path.join(channel, quality, hour, path)
try:
local_infos.append(common.parse_segment_path(path))
except ValueError as e:
self.logger.warning('Local file {} could not be parsed: {}'.format(path, e))
pool = gevent.pool.Pool(self.download_concurrency) pool = gevent.pool.Pool(self.download_concurrency)
workers = [] workers = []
small_differences = 0
for missing_segment in missing_segments: for missing_segment in missing_segments:
if self.stopping.is_set(): if self.stopping.is_set():
@ -542,6 +563,21 @@ class BackfillerWorker(object):
if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=self.recent_cutoff): if datetime.datetime.utcnow() - segment.start < datetime.timedelta(seconds=self.recent_cutoff):
self.logger.debug('Skipping {} as too recent'.format(path)) self.logger.debug('Skipping {} as too recent'.format(path))
continue continue
# if any local segment is within 1ms of the missing segment and otherwise identical, ignore it
found = None
for local_segment in local_infos:
# if any fields differ, no match
if not all(getattr(segment, field) == getattr(local_segment, field) for field in MATCH_FIELDS):
continue
# if time difference > epsilon, no match
if abs((segment.start - local_segment.start).total_seconds()) > EPSILON:
continue
found = local_segment
break
if found is not None:
self.logger.debug(f'Skipping {path} as within {EPSILON}s of identical segment {found.path}')
continue
# start segment as soon as a pool slot opens up, then track it in workers # start segment as soon as a pool slot opens up, then track it in workers
workers.append(pool.spawn( workers.append(pool.spawn(
@ -549,6 +585,8 @@ class BackfillerWorker(object):
self.base_dir, self.node, channel, quality, hour, missing_segment, self.logger self.base_dir, self.node, channel, quality, hour, missing_segment, self.logger
)) ))
small_difference_segments.labels(self.node, channel, quality, hour).set(small_differences)
# verify that all the workers succeeded. if any failed, raise the exception from # verify that all the workers succeeded. if any failed, raise the exception from
# one of them arbitrarily. # one of them arbitrarily.
for worker in workers: for worker in workers:

@ -39,7 +39,6 @@
prizebot: false, prizebot: false,
youtubebot: false, youtubebot: false,
bus_analyzer: false, bus_analyzer: false,
graphs: false,
}, },
// Twitch channels to capture. The first one will be used as the default channel in the editor. // Twitch channels to capture. The first one will be used as the default channel in the editor.
@ -214,11 +213,13 @@
backfill_dirs:: ["emotes"], backfill_dirs:: ["emotes"],
// Enable saving of media (images and videos - this can be large), either globally or split into // Enable saving of media (images and videos - this can be large), either globally or split into
// three options: // options:
// - From chat messages (in chat_archiver.download_media) // - From chat messages (in chat_archiver.download_media)
// - From toots
// - From the image links column in the sheet (using sheetsync) // - From the image links column in the sheet (using sheetsync)
// - Backfilled from other nodes // - Backfilled from other nodes
download_media:: true, download_media:: true,
download_toot_media:: $.download_media,
backfill_media:: $.download_media, backfill_media:: $.download_media,
download_sheet_links:: $.download_media, download_sheet_links:: $.download_media,
@ -329,6 +330,8 @@
// Obtain an access token by running: python -m zulip_bots.tootbot get-access-token // Obtain an access token by running: python -m zulip_bots.tootbot get-access-token
access_token: "", access_token: "",
}, },
[if $.download_toot_media then "media_dir"]: "/mnt/media",
output_path: "/mnt/tootbot.json"
args:: [], args:: [],
}, },
@ -368,9 +371,6 @@
channel_id: "UCz5-PNQxaT4WtB_OMAwD85g", // DesertBusForHope channel_id: "UCz5-PNQxaT4WtB_OMAwD85g", // DesertBusForHope
}, },
// template for donation data urls
donation_url_template:: "https://example.com/DB{}/DB{}.json",
// Extra options to pass via environment variables, // Extra options to pass via environment variables,
// eg. log level, disabling stack sampling. // eg. log level, disabling stack sampling.
env:: { env:: {
@ -756,15 +756,7 @@
environment: $.env, environment: $.env,
}, },
[if $.enabled.graphs then "graphs"]: { local bot_service(name, config, args=[], subcommand=null, mount_segments=false) = {
image: $.get_image("graphs"),
restart: "always",
command: [$.donation_url_template, "--base-dir", "/mnt/graphs"],
volumes: ["%s:/mnt" % $.segments_path],
environment: $.env,
},
local bot_service(name, config, args=[], subcommand=null) = {
image: $.get_image("zulip_bots"), image: $.get_image("zulip_bots"),
restart: "always", restart: "always",
entrypoint: ["python3", "-m", "zulip_bots.%s" % name] entrypoint: ["python3", "-m", "zulip_bots.%s" % name]
@ -772,6 +764,7 @@
+ [std.manifestJson(config)] + [std.manifestJson(config)]
+ args, + args,
environment: $.env, environment: $.env,
[if mount_segments then "volumes"]: ["%s:/mnt" % $.segments_path],
}, },
[if $.enabled.schedulebot then "schedulebot"]: [if $.enabled.schedulebot then "schedulebot"]:
@ -787,7 +780,7 @@
[if $.enabled.tootbot then "tootbot"]: [if $.enabled.tootbot then "tootbot"]:
bot_service("tootbot", $.tootbot + { bot_service("tootbot", $.tootbot + {
zulip+: { url: $.zulip_url }, zulip+: { url: $.zulip_url },
}, $.tootbot.args, subcommand="main"), }, $.tootbot.args, subcommand="main", mount_segments=true),
[if $.enabled.twitchbot then "twitchbot"]: [if $.enabled.twitchbot then "twitchbot"]:
bot_service("twitchbot", $.twitchbot + { bot_service("twitchbot", $.twitchbot + {
@ -797,16 +790,12 @@
[if $.enabled.pubbot then "pubbot"]: [if $.enabled.pubbot then "pubbot"]:
bot_service("pubbot", $.pubbot + { bot_service("pubbot", $.pubbot + {
zulip_url: $.zulip_url, zulip_url: $.zulip_url,
}, ["/mnt/pubnub-log.json"]) + { }, ["/mnt/pubnub-log.json"], mount_segments=true),
volumes: ["%s:/mnt" % $.segments_path],
},
[if $.enabled.blogbot then "blogbot"]: [if $.enabled.blogbot then "blogbot"]:
bot_service("blogbot", $.blogbot + { bot_service("blogbot", $.blogbot + {
zulip_url: $.zulip_url, zulip_url: $.zulip_url,
}, ["--save-dir", "/mnt/blogs"]) + { }, ["--save-dir", "/mnt/blogs"], mount_segments=true),
volumes: ["%s:/mnt" % $.segments_path],
},
[if $.enabled.prizebot then "prizebot"]: [if $.enabled.prizebot then "prizebot"]:
bot_service("prizebot", $.prizebot + { bot_service("prizebot", $.prizebot + {

@ -15,6 +15,7 @@ import gevent.backdoor
import gevent.event import gevent.event
import prometheus_client as prom import prometheus_client as prom
import requests import requests
import requests.adapters
from monotonic import monotonic from monotonic import monotonic
import common import common
@ -49,6 +50,24 @@ ad_segments_ignored = prom.Counter(
["channel", "quality"], ["channel", "quality"],
) )
suspicious_skew_count = prom.Counter(
"suspicious_skew_count",
"Number of times we've restarted a worker due to suspicious skew",
["channel", "quality"],
)
segment_time_skew_non_zero_sum = prom.Gauge(
"segment_time_skew_non_zero_sum",
"Sum of all observed segment skew amounts for worker",
["channel", "quality", "worker"],
)
segment_time_skew_non_zero_count = prom.Counter(
"segment_time_skew_non_zero_count",
"Count of segments with non-zero skew for worker",
["channel", "quality", "worker"],
)
class TimedOutError(Exception): class TimedOutError(Exception):
pass pass
@ -122,7 +141,7 @@ class StreamsManager(object):
FETCH_TIMEOUTS = 5, 30 FETCH_TIMEOUTS = 5, 30
def __init__(self, provider, channel, base_dir, qualities, important=False): def __init__(self, provider, channel, base_dir, qualities, important=False, history_size=0):
self.provider = provider self.provider = provider
self.channel = channel self.channel = channel
self.logger = logging.getLogger("StreamsManager({})".format(channel)) self.logger = logging.getLogger("StreamsManager({})".format(channel))
@ -203,7 +222,7 @@ class StreamsManager(object):
self.logger.info("Ignoring worker start as we are stopping") self.logger.info("Ignoring worker start as we are stopping")
return return
url_time, url = self.latest_urls[quality] url_time, url = self.latest_urls[quality]
worker = StreamWorker(self, quality, url, url_time) worker = StreamWorker(self, quality, url, url_time, self.history_size)
self.stream_workers[quality].append(worker) self.stream_workers[quality].append(worker)
gevent.spawn(worker.run) gevent.spawn(worker.run)
@ -290,7 +309,12 @@ class StreamWorker(object):
FETCH_RETRY_INTERVAL = 1 FETCH_RETRY_INTERVAL = 1
FETCH_POLL_INTERVAL = 2 FETCH_POLL_INTERVAL = 2
def __init__(self, manager, quality, url, url_time): # Max difference between a segment's time + duration and the next segment's time (in seconds)
# before we consider this "suspicious" and trigger a refresh.
# See https://github.com/dbvideostriketeam/wubloader/issues/539
MAX_SEGMENT_TIME_SKEW = 0.01
def __init__(self, manager, quality, url, url_time, history_size):
self.manager = manager self.manager = manager
self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(quality, id(self))) self.logger = manager.logger.getChild("StreamWorker({})@{:x}".format(quality, id(self)))
self.quality = quality self.quality = quality
@ -305,11 +329,16 @@ class StreamWorker(object):
# This worker's SegmentGetters will use its session by default for performance, # This worker's SegmentGetters will use its session by default for performance,
# but will fall back to a new one if something goes wrong. # but will fall back to a new one if something goes wrong.
self.session = common.requests.InstrumentedSession() self.session = common.requests.InstrumentedSession()
adapter = requests.adapters.HTTPAdapter(pool_maxsize=100)
self.session.mount('https://', adapter)
# Map cache is a simple cache to avoid re-downloading the same map URI for every segment, # Map cache is a simple cache to avoid re-downloading the same map URI for every segment,
# since it's generally the same but may occasionally change. # since it's generally the same but may occasionally change.
# We expect the map data to be very small so there is no eviction here. # We expect the map data to be very small so there is no eviction here.
# {uri: data} # {uri: data}
self.map_cache = {} self.map_cache = {}
# If enabled, playlist history is saved after each segment fetch,
# showing the last N playlist fetches up until the one that resulted in that fetch.
self.history_size = history_size
def __repr__(self): def __repr__(self):
return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.quality) return "<{} at 0x{:x} for stream {!r}>".format(type(self).__name__, id(self), self.quality)
@ -347,12 +376,15 @@ class StreamWorker(object):
def _run(self): def _run(self):
first = True first = True
suspicious_skew = False
history = []
while not self.stopping.is_set(): while not self.stopping.is_set():
self.logger.debug("Getting media playlist {}".format(self.url)) self.logger.debug("Getting media playlist {}".format(self.url))
try: try:
with soft_hard_timeout(self.logger, "getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker): with soft_hard_timeout(self.logger, "getting media playlist", self.FETCH_TIMEOUTS, self.trigger_new_worker):
playlist = self.manager.provider.get_media_playlist(self.url, session=self.session) playlist_time = datetime.datetime.utcnow()
raw_playlist, playlist = self.manager.provider.get_media_playlist(self.url, session=self.session)
except Exception as e: except Exception as e:
self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True) self.logger.warning("Failed to fetch media playlist {}".format(self.url), exc_info=True)
self.trigger_new_worker() self.trigger_new_worker()
@ -368,8 +400,12 @@ class StreamWorker(object):
# We successfully got the playlist at least once # We successfully got the playlist at least once
first = False first = False
if self.history_size > 0:
history = [(playlist_time, raw_playlist)] + history[:self.history_size]
# Start any new segment getters # Start any new segment getters
date = None # tracks date in case some segment doesn't include it date = None # tracks date in case some segment doesn't include it
prev_segment = None
for segment in playlist.segments: for segment in playlist.segments:
if segment.ad_reason: if segment.ad_reason:
self.logger.info("Ignoring ad segment: {}".format(segment.ad_reason)) self.logger.info("Ignoring ad segment: {}".format(segment.ad_reason))
@ -380,7 +416,26 @@ class StreamWorker(object):
self.manager.mark_working(self) self.manager.mark_working(self)
if segment.date: if segment.date:
date = common.dateutil.parse(segment.date) new_date = common.dateutil.parse(segment.date)
if date is not None:
# We have observed an issue on twitch where there will be a small time jump
# (eg. with 2s segments, times will be 00:10.3, 00:12.3, 00:14.7, 00:16.7)
# and all subsequent segment timestamps will be out by this amount compared
# to what other downloader instances see. Our workaround for this issue is to
# watch for such gaps and:
# 1. trigger a worker refresh (which seems to fix the issue)
# 2. treat all subsequent segments as "suspect" so they are still saved
# but only used if no other source is available.
skew = (date - new_date).total_seconds()
if skew != 0:
segment_time_skew_non_zero_sum.labels(self.manager.channel, self.quality, f"{id(self):x}").inc(skew)
segment_time_skew_non_zero_count.labels(self.manager.channel, self.quality, f"{id(self):x}").inc()
if abs(skew) > self.MAX_SEGMENT_TIME_SKEW and not suspicious_skew:
self.logger.warning(f"Suspicious skew of {skew}, triggering new worker: Expected {date} after {prev_segment}, got {new_date} for {segment}")
self.trigger_new_worker()
suspicious_skew = True
suspicious_skew_count.labels(self.manager.channel, self.quality).inc()
date = new_date
if segment.uri not in self.getters: if segment.uri not in self.getters:
if date is None: if date is None:
raise ValueError("Cannot determine date of segment") raise ValueError("Cannot determine date of segment")
@ -392,11 +447,14 @@ class StreamWorker(object):
self.quality, self.quality,
segment, segment,
date, date,
suspicious_skew,
self.map_cache, self.map_cache,
history,
) )
gevent.spawn(self.getters[segment.uri].run) gevent.spawn(self.getters[segment.uri].run)
if date is not None: if date is not None:
date += datetime.timedelta(seconds=segment.duration) date += datetime.timedelta(seconds=segment.duration)
prev_segment = segment
# Clean up any old segment getters. # Clean up any old segment getters.
# Note use of list() to make a copy to avoid modification-during-iteration # Note use of list() to make a copy to avoid modification-during-iteration
@ -448,19 +506,21 @@ class SegmentGetter(object):
# or so, to be paranoid we set it to considerably longer than that. # or so, to be paranoid we set it to considerably longer than that.
GIVE_UP_TIMEOUT = 20 * 60 GIVE_UP_TIMEOUT = 20 * 60
def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, map_cache): def __init__(self, parent_logger, session, base_dir, channel, quality, segment, date, suspect, map_cache, history):
self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self))) self.logger = parent_logger.getChild("SegmentGetter@{:x}".format(id(self)))
self.base_dir = base_dir self.base_dir = base_dir
self.channel = channel self.channel = channel
self.quality = quality self.quality = quality
self.segment = segment self.segment = segment
self.date = date self.date = date
self.suspect = suspect
self.map_cache = map_cache self.map_cache = map_cache
self.prefix = self.make_path_prefix() self.prefix = self.make_path_prefix()
self.retry = None # Event, set to begin retrying self.retry = None # Event, set to begin retrying
self.done = gevent.event.Event() # set when file exists or we give up self.done = gevent.event.Event() # set when file exists or we give up
# Our parent's connection pool, but we'll replace it if there's any issues # Our parent's connection pool, but we'll replace it if there's any issues
self.session = session self.session = session
self.history = history
def run(self): def run(self):
try: try:
@ -517,6 +577,12 @@ class SegmentGetter(object):
Type may be: Type may be:
full: Segment is complete. Hash is included. full: Segment is complete. Hash is included.
suspect: Segment appears to be complete, but we suspect it is not. Hash is included. suspect: Segment appears to be complete, but we suspect it is not. Hash is included.
This currently triggers on two conditions:
- If a segment takes a very long time to download, which we've observed to result in
partial files even though they appeared to end correctly.
- If the StreamWorker has encountered a time gap, then we suspect this segment to be
mis-timed. We have observed this where there is a small (~0.5s) time jump, then
all segments are consistently off by that amount compared to other nodes until refresh.
partial: Segment is incomplete. Hash is included. partial: Segment is incomplete. Hash is included.
temp: Segment has not been downloaded yet. A random uuid is added. temp: Segment has not been downloaded yet. A random uuid is added.
""" """
@ -601,7 +667,9 @@ class SegmentGetter(object):
raise e raise e
else: else:
request_duration = monotonic() - start_time request_duration = monotonic() - start_time
segment_type = "full" if request_duration < self.FETCH_SUSPECT_TIME else "suspect" segment_type = "full"
if self.suspect or request_duration >= self.FETCH_SUSPECT_TIME:
segment_type = "suspect"
full_path = self.make_path(segment_type, hash) full_path = self.make_path(segment_type, hash)
self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path)) self.logger.debug("Saving completed segment {} as {}".format(temp_path, full_path))
common.rename(temp_path, full_path) common.rename(temp_path, full_path)
@ -612,6 +680,20 @@ class SegmentGetter(object):
stat = latest_segment.labels(channel=self.channel, quality=self.quality) stat = latest_segment.labels(channel=self.channel, quality=self.quality)
timestamp = (self.date - datetime.datetime(1970, 1, 1)).total_seconds() timestamp = (self.date - datetime.datetime(1970, 1, 1)).total_seconds()
stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe stat.set(max(stat._value.get(), timestamp)) # NOTE: not thread-safe but is gevent-safe
if self.history:
self.write_history(full_path)
def write_history(self, segment_path):
segment_path = os.path.relpath(segment_path, self.base_dir)
history_path = os.path.join(self.base_dir, "playlist-debug", segment_path)
try:
os.makedirs(history_path)
except FileExistsError:
pass
for n, (timestamp, playlist) in enumerate(self.history):
filename = "{}_{}".format(n, timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f"))
path = os.path.join(history_path, filename)
common.atomic_write(path, playlist)
def parse_channel(channel): def parse_channel(channel):
@ -630,7 +712,7 @@ def parse_channel(channel):
"This affects retry interval, error reporting and monitoring. " "This affects retry interval, error reporting and monitoring. "
"Non-twitch URLs can also be given with the form CHANNEL[!]:TYPE:URL" "Non-twitch URLs can also be given with the form CHANNEL[!]:TYPE:URL"
) )
def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, twitch_auth_file=None): def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor_port=0, twitch_auth_file=None, playlist_debug=0):
qualities = qualities.split(",") if qualities else [] qualities = qualities.split(",") if qualities else []
twitch_auth_token = None twitch_auth_token = None
@ -651,7 +733,7 @@ def main(channels, base_dir=".", qualities="source", metrics_port=8001, backdoor
channel_qualities = ["source"] channel_qualities = ["source"]
else: else:
raise ValueError(f"Unknown type {type!r}") raise ValueError(f"Unknown type {type!r}")
manager = StreamsManager(provider, channel, base_dir, channel_qualities, important=important) manager = StreamsManager(provider, channel, base_dir, channel_qualities, important=important, history_size=playlist_debug)
managers.append(manager) managers.append(manager)
def stop(): def stop():

@ -29,7 +29,8 @@ class Provider:
session = InstrumentedSession() session = InstrumentedSession()
resp = session.get(uri, metric_name='get_media_playlist') resp = session.get(uri, metric_name='get_media_playlist')
resp.raise_for_status() resp.raise_for_status()
return hls_playlist.load(resp.text, base_uri=resp.url) playlist = resp.text
return playlist, hls_playlist.load(playlist, base_uri=resp.url)
class URLProvider(Provider): class URLProvider(Provider):
@ -139,6 +140,7 @@ class TwitchProvider(Provider):
"allow_audio_only": "true", "allow_audio_only": "true",
"allow_spectre": "false", "allow_spectre": "false",
"fast_bread": "true", "fast_bread": "true",
"supported_codecs": "av1,h265,h264",
"sig": sig, "sig": sig,
"token": token, "token": token,
}, },

@ -0,0 +1,168 @@
#!/bin/bash
set -eu
if [ "$#" -lt 2 ]; then
echo "USAGE: $0 NAME CONNINFO [LIMIT]"
echo "NAME should be a unique name for your node"
echo "CONNINFO should be a postgres connection url like postgresql://USER:PASS@HOSTNAME/DATABASE"
echo "Exits after doing LIMIT jobs, default unlimited. Use limit 0 if you just want to clean up after a crash without doing any jobs."
exit 1
fi
NAME="$1"
CONNINFO="$2"
LIMIT="${3:--1}"
WORKDIR=${WORKDIR:-.}
logcmd() {
echo "Running: $*" >&2
"$@"
}
db() {
psql -Atqbv ON_ERROR_STOP=on "$CONNINFO" "$@"
}
# Expects a url matching "scp://USER:PASS@HOST:PORT/PATH"
# Returns USER PASS HOST PORT PATH, assumes all but path contain no whitespace. Assumes no URL-encoded chars.
url_to_parts() {
parts=$(sed -E 's|scp://([^:@]+):([^@]+)@([^:]+):([0-9]+)/(.+)|\1 \2 \3 \4 \5|' <<<"$1")
if [ "$parts" == "$1" ]; then # no substitution
echo "Could not parse URL: $1" >&2
return 1
fi
echo "$parts"
}
url_to_filename() {
local user pass host port path name
parts=$(url_to_parts "$1")
read -r user pass host port path <<<"$parts"
name=$(basename "$path")
echo "$WORKDIR/$name"
}
download_file() {
local user pass host port path file
parts=$(url_to_parts "$1")
read -r user pass host port path <<<"$parts"
file=$(url_to_filename "$1")
logcmd sshpass -p "$pass" scp -P "$port" "$user@$host:$path" "$file"
}
upload_file() {
local user pass host port path file
parts=$(url_to_parts "$1")
read -r user pass host port path <<<"$parts"
file=$(url_to_filename "$1")
logcmd sshpass -p "$pass" scp -P "$port" "$file" "$user@$host:$path"
}
encode() {
local src dest args
src="$1"
dest="$2"
shift 2
args=()
for arg in "$@"; do
sub=$(sed "s|{SRC_FILE}|$src|g; s|{DEST_FILE}|$dest|g" <<<"$arg")
args+=("$sub")
done
logcmd ffmpeg -hide_banner -nostdin -y "${args[@]}"
}
quit_after_current() {
LIMIT=0
echo "Will quit when current job is finished"
}
trap quit_after_current TERM
existing=$(
db -v name="$NAME" <<-SQL
SELECT claimed_at, dest_url FROM encodes
WHERE claimed_by = :'name' AND dest_hash IS NULL
SQL
)
if [ -n "$existing" ]; then
echo "WARNING: The following files are already claimed by this node:"
echo "$existing"
echo
echo -n "This is likely due to a crash. Unclaim these files? [Y/n] > "
read -r resp
if [ "$resp" != "n" ]; then
db -v name="$NAME" <<-SQL
UPDATE encodes SET
claimed_by = NULL,
claimed_at = NULL
WHERE claimed_by = :'name' AND dest_hash IS NULL
SQL
fi
fi
while [ "$((LIMIT--))" -ne 0 ] ; do
echo "Checking for jobs"
claimed=$(
db -F ' ' -v name="$NAME" <<-SQL
UPDATE encodes SET
claimed_by = :'name',
claimed_at = now()
WHERE dest_url = (
SELECT dest_url FROM encodes
WHERE claimed_by IS NULL AND dest_hash IS NULL
LIMIT 1
)
RETURNING src_url, src_hash, dest_url
SQL
)
if [ -z "$claimed" ]; then
echo "No available jobs, will check again in 1min"
sleep 60
continue
fi
read -r src_url src_hash dest_url <<<"$claimed"
src_file=$(url_to_filename "$src_url")
dest_file=$(url_to_filename "$dest_url")
echo "Got task to encode $dest_file"
# Read encode args seperately as we need to split out the array.
# The following query outputs one row per arg, seperated by nul chars.
# readarray -d '' will read into the given array after splitting on nul chars.
readarray -td '' encode_args < <(
db -0 -v dest_url="$dest_url" <<-SQL
SELECT unnest(encode_args) FROM encodes
WHERE dest_url = :'dest_url'
SQL
)
if [ -f "$src_file" ]; then
if sha256sum --status -c - <<<"$src_hash $src_file"; then
echo "Source file already exists - skipping download"
else
echo "Existing source file does not match hash - assuming corrupt and re-downloading."
rm "$src_file"
fi
fi
if ! [ -f "$src_file" ]; then
echo "Downloading source file (no progress bar sorry, blame scp)"
download_file "$src_url"
echo "Checking source file checksum"
sha256sum --status -c - <<<"$src_hash $src_file"
fi
echo "Starting encode"
encode "$src_file" "$dest_file" "${encode_args[@]}"
echo "Encode complete, uploading output file (still no progress bar)"
upload_file "$dest_url"
echo "Calculating output hash and marking complete"
dest_hash=$(sha256sum "$dest_file" | cut -d' ' -f1)
# Re-setting claimed_by *should* be a no-op here but if something has
# gone wrong at least we'll know which node is writing.
db -v dest_url="$dest_url" -v dest_hash="$dest_hash" -v name="$NAME" <<-SQL
UPDATE encodes SET
dest_hash = :'dest_hash',
claimed_by = :'name',
finished_at = now()
WHERE dest_url = :'dest_url'
SQL
done

@ -1,9 +0,0 @@
FROM python:3.11
RUN pip install bokeh
COPY graphs /tmp/graphs
RUN pip install /tmp/graphs && rm -r /tmp/graphs
LABEL org.opencontainers.image.source https://github.com/dbvideostriketeam/wubloader
ENTRYPOINT ["python3", "-m", "graphs"]

@ -1,13 +0,0 @@
import gevent.monkey
gevent.monkey.patch_all()
import logging
import argh
from graphs.main import main
LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s"
logging.basicConfig(level='INFO', format=LOG_FORMAT)
argh.dispatch_command(main)

@ -1,180 +0,0 @@
import gevent.monkey
gevent.monkey.patch_all()
import datetime
import logging
import json
import os
import argh
import bokeh.plotting
import bokeh.models
import bokeh.palettes
import bokeh.settings
import numpy as np
import requests
def format_year(year):
if year > 10:
year += 2006
return 'DBfH {}'.format(year)
def parse_json(json_donations, start_date, end_hour=np.inf, every_five=True):
end_hour = float(end_hour)
times = []
donations = []
for entry in json_donations:
times.append(datetime.datetime(*entry[:5]).isoformat())
donations.append(entry[5])
times = np.array(times, dtype=np.datetime64)
donations = np.asarray(donations)
start_time = np.datetime64(start_date)
bustimes = np.array(times - start_time, dtype=np.int_)
trimmed_bustimes = bustimes[(bustimes <= 60 * 60 * end_hour) & (bustimes >= 0)]
trimmed_donations = donations[(bustimes <= 60 * 60 * end_hour) & (bustimes >= 0)]
if every_five:
five_bustimes = trimmed_bustimes[::5]
five_donations = trimmed_donations[::5]
return five_bustimes, five_donations
else:
return trimmed_bustimes, trimmed_donations
def load_previous_donations(start_end_times, donation_url_template, timeout):
all_years = {}
for year in start_end_times:
start, end = start_end_times[year]
if not end:
current_year = year
continue
url = donation_url_template.format(year, year)
logging.info('Loading {}'.format(url))
year_json = requests.get(url, timeout=timeout).json()
all_years[year] = parse_json(year_json, start, end, year >= 5)
return all_years, current_year
def all_years_donations_graph(start_end_times, all_years, current_year, current_json, base_dir):
logging.info('Generating all years donation graph')
p = bokeh.plotting.figure(x_axis_label='Bus Time', y_axis_label='Donations', x_range=(0, 60 * 60 * 172),
width=1280, height=720, active_scroll='wheel_zoom',
tools='pan,wheel_zoom,box_zoom,reset')
p.add_tools(bokeh.models.HoverTool(tooltips=[('', '$name'), ('Bustime', '@Bustime{00:00:00}'),
('Donations', '$@Donations{0,0.00}')]))
for year in start_end_times:
label = format_year(year)
if year != current_year:
times, donations = all_years[year]
line_width = 2
else:
times, donations = parse_json(current_json, start_end_times[year][0], every_five=False)
line_width = 3
model = bokeh.models.ColumnDataSource(data={'Bustime':times, 'Donations':donations})
p.line(x='Bustime', y='Donations', source=model, line_width=line_width,
line_color=bokeh.palettes.Category20[20][current_year - year],
legend_label=label, name=label)
p.xaxis.ticker = bokeh.models.AdaptiveTicker(mantissas=[60, 120, 300, 600, 1200, 3600, 7200, 10800, 43200, 86400], base=10000000)
p.xaxis.formatter = bokeh.models.NumeralTickFormatter(format='00:00:00')
p.yaxis.formatter = bokeh.models.NumeralTickFormatter(format='$0,0')
p.legend.location = "top_left"
p.legend.click_policy="hide"
output_path = os.path.join(base_dir, 'all_years_donations.html')
bokeh.plotting.output_file(filename=output_path, title='DBfH All Years Donations')
bokeh.plotting.save(p, filename=output_path)
logging.info('{} Saved'.format(output_path))
def shifts_graph(start_end_times, current_year, current_json, base_dir, shifts):
logging.info('Generating DBfH {} shifts graph'.format(current_year))
times, donations = parse_json(current_json, start_end_times[current_year][0], every_five=False)
start_hour = int(start_end_times[current_year][0][11:13])
hours = times / 3600 + start_hour
mod_hours = hours % 24
n_days = int(hours.max() / 24) + 1
p = bokeh.plotting.figure(x_axis_label='Hour of Day', y_axis_label='Donations', x_range=(0, 24 * 3600),
width=1280, height=720, active_scroll='wheel_zoom',
tools='pan,wheel_zoom,box_zoom,reset')
p.add_tools(bokeh.models.HoverTool(tooltips=[('', '$name'), ('Hour of Day', '@Hours{00:00:00}'),
('Donations', '$@Donations{0,0.00}')]))
for day in range(n_days):
for shift in shifts:
in_range = (hours >= day * 24 + shift[1]) & (hours <= day * 24 + shift[2])
hours_in_range = mod_hours[in_range]
if mod_hours[in_range].size:
if hours_in_range[-1] == 0.:
hours_in_range[-1] = 24
model = bokeh.models.ColumnDataSource(data={'Hours':hours_in_range * 3600, 'Donations':donations[in_range] - donations[in_range][0]})
p.line(x='Hours', y='Donations', source=model, line_color=bokeh.palettes.Category10[10][day],
line_width=2, legend_label='Day {}'.format(day + 1), name='Day {} {}'.format(day + 1, shift[0]))
p.xaxis.ticker = bokeh.models.AdaptiveTicker(mantissas=[60, 120, 300, 600, 1200, 3600, 7200, 10800, 43200, 86400], base=10000000)
p.xaxis.formatter = bokeh.models.NumeralTickFormatter(format='00:00:00')
p.yaxis.formatter = bokeh.models.NumeralTickFormatter(format='$0,0')
p.legend.location = "top_left"
p.legend.click_policy="hide"
output_path = os.path.join(base_dir, 'DBfH_{}_shifts_graph.html'.format(current_year))
bokeh.plotting.output_file(filename=output_path, title='{} Shift Donations'.format(format_year(current_year)))
bokeh.plotting.save(p, filename=output_path)
logging.info('{} Saved'.format(output_path))
@argh.arg('--base-dir', help='Directory where graphs are output. Default is current working directory.')
def main(donation_url_template, base_dir='.'):
stopping = gevent.event.Event()
logging.getLogger('bokeh').setLevel(logging.WARNING)
delay = 60 * 1
timeout = 15
shifts = [['Zeta Shift', 0, 6],
['Dawn Guard', 6, 12],
['Alpha Flight', 12, 18],
['Night Watch', 18, 24]]
# First load data required
logging.info('Loading start and end times')
start_end_path = os.path.join(base_dir, 'start_end_times.json')
start_end_times = json.load(open(start_end_path))
start_end_times = {int(year):start_end_times[year] for year in start_end_times}
all_years, current_year = load_previous_donations(start_end_times, donation_url_template, timeout)
current_url = donation_url_template.format(current_year, current_year)
while not stopping.is_set():
try:
logging.info('Loading {}'.format(current_url))
current_json = requests.get(current_url, timeout=timeout).json()
all_years_donations_graph(start_end_times, all_years, current_year, current_json, base_dir)
shifts_graph(start_end_times, current_year, current_json, base_dir, shifts)
except Exception:
logging.exception('Plotting failed. Retrying')
stopping.wait(delay)

@ -1,14 +0,0 @@
from setuptools import setup, find_packages
setup(
name = 'graphs',
version = '0.0.0',
packages = find_packages(),
install_requires = [
'argh',
'bokeh',
'gevent',
'numpy',
'requests'
],
)

@ -195,3 +195,24 @@ CREATE TABLE templates (
crop box_coords NOT NULL, crop box_coords NOT NULL,
location box_coords NOT NULL location box_coords NOT NULL
); );
-- Used to farm out encoding jobs to encoder workers.
-- URL fields must match form: "scp://USER:PASS@HOST:PORT/PATH"
-- Hash fields are hex strings containing sha256 hashes.
-- encode_args should be passed verbatim to ffmpeg with the following substitutions:
-- {SRC_FILE}: The path to the source file
-- {DEST_FILE}: The path to the output file
-- Example encode args: '-i' '{SRC_FILE}' '-c' 'copy' '{DEST_FILE}'
-- The job is considered complete once the dest hash is written.
-- Jobs may be claimed by writing a worker name to claimed_by.
-- Timestamp fields are indicative only.
CREATE TABLE encodes (
src_url TEXT NOT NULL,
src_hash TEXT NOT NULL,
encode_args TEXT[] NOT NULL,
dest_url TEXT PRIMARY KEY,
dest_hash TEXT,
claimed_by TEXT,
claimed_at TIMESTAMP,
finished_at TIMESTAMP
);

@ -80,3 +80,15 @@ if [ -n "$BUSCRIBE_USER" ]; then
echo "Applying schema for $BUSCRIBE_DB" echo "Applying schema for $BUSCRIBE_DB"
sql "$BUSCRIBE_USER" -d "$BUSCRIBE_DB" < /buscribe.sql sql "$BUSCRIBE_USER" -d "$BUSCRIBE_DB" < /buscribe.sql
fi fi
if [ -n "$ENCODER_USER" ]; then
echo "Creating $ENCODER_USER"
echo "host all $ENCODER_USER all md5" >> "$PGDATA/pg_hba.conf"
sql "$POSTGRES_USER" <<-EOSQL
CREATE USER $ENCODER_USER WITH CONNECTION LIMIT 50 LOGIN PASSWORD '$ENCODER_PASSWORD';
GRANT CONNECT ON DATABASE $POSTGRES_DB TO $ENCODER_USER;
GRANT USAGE ON SCHEMA public TO $ENCODER_USER;
GRANT SELECT ON TABLE encodes TO $ENCODER_USER;
GRANT UPDATE ( dest_hash, claimed_by, claimed_at, finished_at ) ON TABLE encodes TO $ENCODER_USER;
EOSQL
fi

@ -91,10 +91,10 @@ def metrics():
"""Return current metrics in prometheus metrics format""" """Return current metrics in prometheus metrics format"""
return prom.generate_latest() return prom.generate_latest()
# To make nginx proxying simpler, we want to allow /metrics/* to work # To make nginx proxying simpler, we want to allow /metrics/restreamer to work
@app.route('/metrics/<trailing>') @app.route('/metrics/restreamer')
@request_stats @request_stats
def metrics_with_trailing(trailing): def metrics_with_trailing():
"""Expose Prometheus metrics.""" """Expose Prometheus metrics."""
return prom.generate_latest() return prom.generate_latest()

@ -157,7 +157,7 @@ def main(conf_file, message_log_file, name=socket.gethostname()):
logging.info("New donation total: {}{}{}".format(msg["d"], increase_str, entries_str)) logging.info("New donation total: {}{}{}".format(msg["d"], increase_str, entries_str))
total = msg["d"] total = msg["d"]
if increase is not None and increase > 0: if increase is not None and increase > 0:
client.send_to_stream("bot-spam", "Donation Firehose", "Donation total is now ${:.2f}{}{}".format(msg["d"], increase_str, entries_str)) client.send_to_stream("firehose", "Donations", "Donation total is now ${:.2f}{}{}".format(msg["d"], increase_str, entries_str))
if increase is not None and increase >= 500: if increase is not None and increase >= 500:
client.send_to_stream("bot-spam", "Notable Donations", "Large donation of ${:.2f} (total ${:.2f}){}".format(increase, msg['d'], entries_str)) client.send_to_stream("bot-spam", "Notable Donations", "Large donation of ${:.2f} (total ${:.2f}){}".format(increase, msg['d'], entries_str))

@ -13,6 +13,15 @@ from .config import get_config
cli = argh.EntryPoint() cli = argh.EntryPoint()
def try_save_image(media_dir, attachment):
if media_dir is None:
return {"error": "no media dir given"}
try:
return {"path": media.download_media(attachment["url"], media_dir)}
except media.Rejected as e:
return {"error": str(e)}
def format_account(account): def format_account(account):
return f"**[{account['display_name']}]({account['url']})**" return f"**[{account['display_name']}]({account['url']})**"
@ -167,11 +176,12 @@ LINE = "\n---"
class Listener(mastodon.StreamListener): class Listener(mastodon.StreamListener):
def __init__(self, zulip_client, stream, post_topic, notification_topic): def __init__(self, zulip_client, stream, post_topic, notification_topic, output_path, media_dir):
self.zulip_client = zulip_client self.zulip_client = zulip_client
self.stream = stream self.stream = stream
self.post_topic = post_topic self.post_topic = post_topic
self.notification_topic = notification_topic self.notification_topic = notification_topic
self.output_path = output_path
def send(self, topic, content): def send(self, topic, content):
logging.info(f"Sending message to {self.stream}/{topic}: {content!r}") logging.info(f"Sending message to {self.stream}/{topic}: {content!r}")
@ -179,25 +189,50 @@ class Listener(mastodon.StreamListener):
def on_update(self, status): def on_update(self, status):
logging.info(f"Got update: {status!r}") logging.info(f"Got update: {status!r}")
self.output("update", status)
self.send(self.post_topic, format_status(status) + LINE) self.send(self.post_topic, format_status(status) + LINE)
def on_delete(self, status_id): def on_delete(self, status_id):
logging.info(f"Got delete: {status_id}") logging.info(f"Got delete: {status_id}")
self.output("delete", payload=status_id)
self.send(self.post_topic, f"*Status with id {status_id} was deleted*") self.send(self.post_topic, f"*Status with id {status_id} was deleted*")
def on_status_update(self, status): def on_status_update(self, status):
logging.info(f"Got status update: {status!r}") logging.info(f"Got status update: {status!r}")
self.output("status_update", status)
self.send(self.post_topic, f"*The following status has been updated*\n{format_status(status)}" + LINE) self.send(self.post_topic, f"*The following status has been updated*\n{format_status(status)}" + LINE)
def on_notification(self, notification): def on_notification(self, notification):
logging.info(f"Got {notification['type']} notification: {notification!r}") logging.info(f"Got {notification['type']} notification: {notification!r}")
if notification["type"] != "mention": if notification["type"] != "mention":
return return
self.output("mention", notification["status"])
self.send(self.notification_topic, format_status(notification["status"]) + LINE) self.send(self.notification_topic, format_status(notification["status"]) + LINE)
def output(self, type, status=None, payload=None):
if self.output_path is None:
return
data = {"type": type}
if status is not None:
data["status"] = status
data["attachments"] = [
try_save_image(self.media_dir, attachment)
for attachment in status["media_attachments"]
]
if payload is not None:
data["payload"] = payload
data = json.dumps(data)
with open(os.path.join(self.output_path, "messages.json"), "a") as f:
f.write(data + "\n")
@cli @cli
def main(conf_file, stream="bot-spam", post_topic="Toots from Desert Bus", notification_topic="Mastodon Notifications"): def main(
conf_file,
stream="bot-spam",
post_topic="Toots from Desert Bus",
notification_topic="Mastodon Notifications",
):
""" """
Run the actual bot. Run the actual bot.
@ -211,6 +246,8 @@ def main(conf_file, stream="bot-spam", post_topic="Toots from Desert Bus", notif
client_id # only required for get-access-token client_id # only required for get-access-token
client_secret # only required for get-access-token client_secret # only required for get-access-token
access_token # only required for main access_token # only required for main
output_path # optional
media_dir # optional
""" """
logging.basicConfig(level='INFO') logging.basicConfig(level='INFO')
@ -220,7 +257,7 @@ def main(conf_file, stream="bot-spam", post_topic="Toots from Desert Bus", notif
zulip_client = zulip.Client(zc["url"], zc["email"], zc["api_key"]) zulip_client = zulip.Client(zc["url"], zc["email"], zc["api_key"])
mastodon_client = mastodon.Mastodon(api_base_url=mc["url"], access_token=mc["access_token"]) mastodon_client = mastodon.Mastodon(api_base_url=mc["url"], access_token=mc["access_token"])
listener = Listener(zulip_client, stream, post_topic, notification_topic) listener = Listener(zulip_client, stream, post_topic, notification_topic, conf.get("output_path"), conf.get("media_dir"))
RETRY_INTERVAL = 1 RETRY_INTERVAL = 1

@ -11,6 +11,30 @@ from common.googleapis import GoogleAPIClient
from .config import get_config from .config import get_config
from .zulip import Client from .zulip import Client
_video_title_cache = {}
def get_video_title(google, video_id):
if video_id not in _video_title_cache:
_video_title_cache[video_id] = _get_video_title(google, video_id)
return _video_title_cache[video_id]
def _get_video_title(google, video_id):
resp = google.request("GET",
"https://www.googleapis.com/youtube/v3/videos",
params={
"part": "snippet",
"id": video_id,
}
)
resp.raise_for_status()
items = resp.json()["items"]
if not items:
# This should be rare, and it's ok to just crash
raise Exception(f"Request for video id {video_id} returned no results, was it deleted?")
return items[0]["snippet"]["title"]
def get_comments(google, channel_id): def get_comments(google, channel_id):
resp = google.request("GET", resp = google.request("GET",
"https://www.googleapis.com/youtube/v3/commentThreads", "https://www.googleapis.com/youtube/v3/commentThreads",
@ -39,10 +63,12 @@ def get_comments(google, channel_id):
return comments return comments
def show_comment(zulip, stream, topic, comment): def show_comment(zulip, google, stream, topic, comment):
title = get_video_title(google, comment["videoId"])
c = comment["snippet"] c = comment["snippet"]
author = f"[{c['authorDisplayName']}]({c['authorChannelUrl']})" author = f"[{c['authorDisplayName']}]({c['authorChannelUrl']})"
video = f"https://youtu.be/{comment['videoId']}" url = f"https://youtu.be/{comment['videoId']}"
video = f"[{title}]({url})"
message = f"{author} commented on {video}:\n```quote\n{c['textDisplay']}\n```" message = f"{author} commented on {video}:\n```quote\n{c['textDisplay']}\n```"
logging.info(f"Sending message to {stream}/{topic}: {message!r}") logging.info(f"Sending message to {stream}/{topic}: {message!r}")
# Empty stream acts as a dry-run mode # Empty stream acts as a dry-run mode
@ -92,7 +118,7 @@ def main(conf_file, interval=60, one_off=0, stream="bot-spam", topic="Youtube Co
if comment["id"] in seen: if comment["id"] in seen:
logging.debug(f"Comment {comment['id']} already seen, skipping") logging.debug(f"Comment {comment['id']} already seen, skipping")
continue continue
show_comment(zulip, stream, topic, comment) show_comment(zulip, google, stream, topic, comment)
seen.append(comment["id"]) seen.append(comment["id"])
seen = seen[-keep:] seen = seen[-keep:]

Loading…
Cancel
Save