Compare commits

...

9 Commits

Author SHA1 Message Date
Mike Lang 7483794a23 docker-compose: Set up sheetsync for downloading media links
This involves giving it access to the SEGMENTS/media directory.
2 months ago
Mike Lang 3e7cb38cf0 sheetsync: Optionally download media linked in image links column
To enable this, you need to:
- set --media-dir globally for sheetsync
- enable download_media=true for the events sync config
To disable for individual rows (eg. because of known issues), put "[nodownload]" in the notes column.
2 months ago
Mike Lang 7b590cf574 chat-archiver: Some cleanups to the URL matching regex
With thanks to Me-Me for review
2 months ago
Mike Lang 15f86551d4 docker-compose: Backfill of media, with or without chat archiver 2 months ago
Mike Lang 9dfb00f4ab chat_archiver: Logic for checking and downloading media links 2 months ago
Mike Lang 2855ec759d download_media: Add pdf to default allowed content types
We want to capture linked PDFs in addition to videos and images
2 months ago
Mike Lang b46c577014 download_media: Add function for checking if a URL has been downloaded before 2 months ago
Mike Lang 352c9e9081 download_media: Get data from potentially malicious URLs and store in the filesystem
This is suitable for taking arbitary URLs from chat, etc and trying to fetch them.
It downloads them to a filepath that contains a hash of the URL and content.
2 months ago
Mike Lang 07055e3605 chat-archiver: extract the ensure_emotes greenlet management to a class 2 months ago

@ -4,7 +4,6 @@ gevent.monkey.patch_all()
import argh
import logging
import json
from chat_archiver.main import ensure_emotes, wait_for_ensure_emotes

@ -0,0 +1,27 @@
"""A group of greenlets running tasks.
Each task has a key. If a task with that key is already running,
it is not re-run."""
import gevent
class KeyedGroup:
def __init__(self):
self.greenlets = {}
def spawn(self, key, func, *args, **kwargs):
if key not in self.greenlets:
self.greenlets[key] = gevent.spawn(self._wrapper, key, func, *args, **kwargs)
return self.greenlets[key]
def wait(self):
"""Blocks until all tasks started before wait() was called are finished."""
gevent.wait(self.greenlets.values())
def _wrapper(self, key, func, *args, **kwargs):
try:
return func(*args, **kwargs)
finally:
assert self.greenlets[key] is gevent.getcurrent()
del self.greenlets[key]

@ -5,6 +5,7 @@ import json
import logging
import os
import random
import re
import string
import signal
import socket
@ -19,12 +20,15 @@ import gevent.queue
from common import atomic_write, listdir
from common.chat import BATCH_INTERVAL, format_batch, get_batch_files, merge_messages
from common.media import download_media, FailedResponse, WrongContent, Rejected
from girc import Client
from monotonic import monotonic
import prometheus_client as prom
import requests
from .keyed_group import KeyedGroup
# These are known to arrive up to MAX_DELAY after their actual time
DELAYED_COMMANDS = [
@ -113,12 +117,13 @@ merge_pass_merges = prom.Histogram(
)
class Archiver(object):
def __init__(self, name, base_dir, channels, nick, oauth_token):
def __init__(self, name, base_dir, channels, nick, oauth_token, download_media):
self.logger = logging.getLogger(type(self).__name__).getChild(name)
self.name = name
self.messages = gevent.queue.Queue()
self.channels = channels
self.base_dir = base_dir
self.download_media = download_media
self.stopping = gevent.event.Event()
self.got_reconnect = gevent.event.Event()
@ -250,6 +255,9 @@ class Archiver(object):
emote_ids = [emote_spec.split(':')[0] for emote_spec in emote_specs]
ensure_emotes(self.base_dir, emote_ids)
if self.download_media and data['command'] == "PRIVMSG" and len(data["params"]) == 2:
ensure_image_links(self.base_dir, data["params"][1])
if data['tags'] and 'tmi-sent-ts' in data['tags']:
# explicit server time is available
timestamp = int(data['tags']['tmi-sent-ts']) / 1000. # original is int ms
@ -306,47 +314,108 @@ class Archiver(object):
self.client.stop()
_EMOTES_RUNNING = {} # map (base_dir, emote id, theme, scale) -> in-progress greenlet fetching that path
_EMOTES_RUNNING = KeyedGroup()
def ensure_emotes(base_dir, emote_ids):
"""Tries to download given emote from twitch if it doesn't already exist.
This happens in the background and errors are ignored.
"""
def _ensure_emote(emote_id, theme, scale):
url = "https://static-cdn.jtvnw.net/emoticons/v2/{}/default/{}/{}".format(emote_id, theme, scale)
path = os.path.join(base_dir, "emotes", emote_id, "{}-{}".format(theme, scale))
if os.path.exists(path):
logging.debug("Emote {} already exists".format(path))
return
logging.info("Fetching emote from {}".format(url))
try:
url = "https://static-cdn.jtvnw.net/emoticons/v2/{}/default/{}/{}".format(emote_id, theme, scale)
path = os.path.join(base_dir, "emotes", emote_id, "{}-{}".format(theme, scale))
if os.path.exists(path):
logging.debug("Emote {} already exists".format(path))
return
logging.info("Fetching emote from {}".format(url))
try:
response = requests.get(url)
except Exception:
logging.warning("Exception while fetching emote from {}".format(url), exc_info=True)
return
if not response.ok:
logging.warning("Error {} while fetching emote from {}: {}".format(response.status_code, url, response.text))
return
atomic_write(path, response.content)
logging.info("Saved emote {}".format(path))
finally:
assert _EMOTES_RUNNING[base_dir, emote_id, theme, scale] is gevent.getcurrent()
del _EMOTES_RUNNING[base_dir, emote_id, theme, scale]
response = requests.get(url)
except Exception:
logging.warning("Exception while fetching emote from {}".format(url), exc_info=True)
return
if not response.ok:
logging.warning("Error {} while fetching emote from {}: {}".format(response.status_code, url, response.text))
return
atomic_write(path, response.content)
logging.info("Saved emote {}".format(path))
for emote_id in emote_ids:
for theme in ('light', 'dark'):
for scale in ('1.0', '2.0', '3.0'):
# to prevent downloading the same emote twice because the first download isn't finished yet,
# check if it's already running.
# use a KeyedGroup.
key = base_dir, emote_id, theme, scale
if key not in _EMOTES_RUNNING:
_EMOTES_RUNNING[key] = gevent.spawn(_ensure_emote, emote_id, theme, scale)
else:
logging.debug("Skipping checking emote with key {} - already running".format(key))
_EMOTES_RUNNING.spawn(key, _ensure_emote, emote_id, theme, scale)
def wait_for_ensure_emotes():
gevent.wait(_EMOTES_RUNNING.values())
_EMOTES_RUNNING.wait()
URL_REGEX = re.compile(r"""
# Previous char is not a letter. This prevents eg. "foohttp://example.com"
# Also disallows / as the previous character, otherwise "file:///foo.bar/baz"
# can match on the "foo.bar/baz" part.
(?<! [\w/] )
# optional scheme, which must be http or https (we don't want other schemes)
(?P<scheme> https?:// )?
# Hostname, which must contain a dot. Single-part hostnames like "localhost" are valid
# but we don't want to match them, and this avoids cases like "yes/no" matching.
# We enforce that the TLD is not fully numeric. No TLDs currently look like this
# (though it does end up forbidding raw IPv4 addresses), and a common false-positive
# is "1.5/10" or similar.
( [a-z0-9-]+ \. )+ [a-z][a-z0-9-]+
# Optional port
( : [0-9]+ )?
# Optional path. We assume a path character can be anything that's not completely disallowed
# but don't try to parse it further into query, fragment etc.
# We also include all unicode characters considered "letters" since it's likely someone might
# put a ö or something in a path and copy-paste it from their browser URL bar which renders it
# like that even though it's encoded when actually sent as a URL.
# Restricting this to letters prevents things like non-breaking spaces causing problems.
# For the same reason we also allow {} and [] which seem to show up often in paths.
(?P<path> / [\w!#$%&'()*+,./:;=?@_~{}\[\]-]* )?
""", re.VERBOSE | re.IGNORECASE)
_IMAGE_LINKS_RUNNING = KeyedGroup()
def ensure_image_links(base_dir, text):
"""Find any image or video links in the text and download them if we don't have them already.
This happens in the background and errors are ignored."""
media_dir = os.path.join(base_dir, "media")
def get_url(url):
try:
try:
download_media(url, media_dir)
except FailedResponse:
# We got a 404 or similar.
# Attempt to remove any stray punctuation from the url and try again.
# We only try this once.
if url.endswith("..."):
url = url[:-3]
elif not url[-1].isalnum():
url = url[:-1]
else:
# No puncuation found, let the original result stand
raise
download_media(url, media_dir)
except WrongContent as e:
logging.info(f"Ignoring non-media link {url}: {e}")
except Rejected as e:
logging.warning(f"Rejected dangerous link {url}: {e}")
except Exception:
logging.warning(f"Unable to fetch link {url}", exc_info=True)
for match in URL_REGEX.finditer(text):
# Don't match on bare hostnames with no scheme AND no path. ie.
# http://example.com SHOULD match
# example.com/foo SHOULD match
# example.com SHOULD NOT match
# Otherwise we get a false positive every time someone says "fart.wav" or similar.
if match.group("scheme") is None and match.group("path") is None:
continue
url = match.group(0)
key = (media_dir, url)
_IMAGE_LINKS_RUNNING.spawn(key, get_url, url)
def write_batch(path, batch_time, messages, size_histogram=None):
@ -457,7 +526,7 @@ def merge_batch_files(path, batch_time):
os.remove(batch_file)
def main(nick, oauth_token_path, *channels, base_dir='/mnt', name=None, merge_interval=60, metrics_port=8008):
def main(nick, oauth_token_path, *channels, base_dir='/mnt', name=None, merge_interval=60, metrics_port=8008, download_media=False):
with open(oauth_token_path) as f:
oauth_token = f.read()
# To ensure uniqueness even if multiple instances are running on the same host,
@ -483,7 +552,7 @@ def main(nick, oauth_token_path, *channels, base_dir='/mnt', name=None, merge_in
logging.info("Starting")
for index in count():
# To ensure uniqueness between clients, include a client number
archiver = Archiver("{}.{}".format(name, index), base_dir, channels, nick, oauth_token)
archiver = Archiver("{}.{}".format(name, index), base_dir, channels, nick, oauth_token, download_media)
archive_worker = gevent.spawn(archiver.run)
workers = mergers + [archive_worker]
# wait for either graceful exit, error, or for a signal from the archiver that a reconnect was requested

@ -0,0 +1,247 @@
import json
import logging
import os
import re
import socket
import time
import urllib.parse
from base64 import b64encode
from hashlib import sha256
from uuid import uuid4
import gevent
import prometheus_client as prom
import urllib3.connection
from ipaddress import ip_address
from . import atomic_write, ensure_directory, jitter, listdir
from .stats import timed
media_bytes_downloaded = prom.Counter(
"media_bytes_downloaded",
"Number of bytes of media files downloaded. Includes data downloaded then later rejected.",
)
media_bytes_saved = prom.Histogram(
"media_bytes_saved",
"Size in bytes of downloaded media that was successfully saved",
["content_type"],
buckets = [2**n for n in range(11, 27, 2)],
)
media_already_exists = prom.Counter(
"media_already_exists",
"Count of times we downloaded a file but it already existed",
)
class Rejected(Exception):
"""Indicates a non-retryable failure due to the url response violating our constraints"""
class TooLarge(Rejected):
"""Response was too large"""
class ForbiddenDestination(Rejected):
"""Hostname resolved to non-global IP"""
class BadScheme(Rejected):
"""Bad url scheme"""
class WrongContent(Rejected):
"""Response was not a video or image"""
class FailedResponse(Rejected):
"""Got a 4xx response, probably a bad link"""
def check_for_media(output_dir, url):
"""Returns True if we have at least one version of content for the given url already."""
url_dir = get_url_dir(output_dir, url)
return any(filename.endswith(".metadata.json") for filename in listdir(url_dir))
@timed()
def download_media(
url,
output_dir,
max_size=128*2**20, # 128MiB
timeout=60,
content_types=("image", "video", "application/pdf"),
max_redirects=5,
retries=3,
retry_interval=1,
chunk_size=64*1024, # 64KiB
):
"""Make a GET request to a potentially malicious URL and download the content to file.
We check the following:
- That the host is a public IP
- That the response does not exceed given max size (default 128MB)
- That the content type is in the given list
(the list may contain exact types like "image/png" or categories like "image")
- That the whole thing doesn't take more than a timeout
Redirects *will* be followed but the follow-up requests must obey the same rules
(and do not reset the timeout).
We save the file to OUTPUT_DIR/URL_HASH/FILE_HASH.EXT where EXT is gussed from content-type.
We save additional metadata including the url and content type to OUTPUT_DIR/URL_HASH/FILE_HASH.metadata.json
Raises on any rule violation or non-200 response.
"""
# Stores a list of urls redirected to, latest is current.
urls = [url]
with gevent.Timeout(timeout):
for redirect_number in range(max_redirects):
errors = []
for retry in range(retries):
if retry > 0:
gevent.sleep(jitter(retry_interval))
try:
resp = _request(urls[-1], max_size, content_types)
new_url = resp.get_redirect_location()
if new_url:
urls.append(new_url)
break # break from retry loop, continuing in the redirect loop
_save_response(output_dir, urls, resp, max_size, chunk_size)
return
except Rejected:
raise
except Exception as e:
errors.append(e)
# fall through to next retry loop
else:
# This block will be reached if range(retries) runs out but not via "break"
raise ExceptionGroup(f"All retries failed for url {urls[-1]}", errors)
raise Exception("Too many redirects")
def hash_to_path(hash):
return b64encode(hash.digest(), b"-_").decode().rstrip("=")
def get_url_dir(output_dir, url):
return os.path.join(output_dir, hash_to_path(sha256(url.encode())))
def _save_response(output_dir, urls, resp, max_size, chunk_size):
url_dir = get_url_dir(output_dir, urls[0])
temp_path = os.path.join(url_dir, f".{uuid4()}.temp")
ensure_directory(temp_path)
content_type = resp.headers["content-type"]
# Content type may have form "TYPE ; PARAMS", strip params if present.
# Also normalize for whitespace and case.
content_type = content_type.split(";")[0].strip().lower()
# We attempt to convert content type to an extension by taking the second part
# and stripping anything past the first character not in [a-z0-9-].
# So eg. "image/png" -> "png", "image/svg+xml" -> "svg", "image/../../../etc/password" -> ""
ext = content_type.split("/")[-1]
ext = re.match(r"^[a-z0-9.-]*", ext).group(0)
try:
length = 0
hash = sha256()
with open(temp_path, "wb") as f:
while True:
chunk = resp.read(chunk_size)
if not chunk:
break
hash.update(chunk)
length += len(chunk)
media_bytes_downloaded.inc(len(chunk))
if length > max_size:
raise TooLarge(f"Read more than {length} bytes from url {urls[-1]}")
f.write(chunk)
filename = f"{hash_to_path(hash)}.{ext}"
filepath = os.path.join(url_dir, filename)
# This is vulnerable to a race where two things create the file at once,
# but that's fine since it will always have the same content. This is just an optimization
# to avoid replacing the file over and over (and for observability)
if os.path.exists(filepath):
logging.info(f"Discarding downloaded file for {urls[0]} as it already exists")
media_already_exists.inc()
else:
os.rename(temp_path, filepath)
logging.info(f"Downloaded file for {urls[0]}")
media_bytes_saved.labels(content_type).observe(length)
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
metadata_path = os.path.join(url_dir, f"{hash_to_path(hash)}.metadata.json")
# Again, this is racy but we don't care about double-writes.
# Note it's entirely possible for the image to already exist but still write the metadata,
# this can happen if a previous attempt crashed midway.
if not os.path.exists(metadata_path):
metadata = {
"url": urls[0],
"filename": filename,
"redirects": urls[1:],
"content_type": resp.headers["content-type"],
"fetched_by": socket.gethostname(),
"fetch_time": time.time(),
}
atomic_write(metadata_path, json.dumps(metadata, indent=4))
def _request(url, max_size, content_types):
"""Do the actual request and return a vetted response object, which is either the content
(status 200) or a redirect.
Raises Rejected if content fails checks, anything else should be considered retryable."""
parsed = urllib.parse.urlparse(url)
hostname = parsed.hostname
port = parsed.port
ip = socket.gethostbyname(hostname)
if not ip_address(ip).is_global:
raise ForbiddenDestination(f"Non-global IP {ip} for url {url}")
# In order to provide the host/ip to connect to seperately from the URL,
# we need to drop to a fairly low-level interface.
if parsed.scheme == "http":
conn = urllib3.connection.HTTPConnection(ip, port or 80)
elif parsed.scheme == "https":
conn = urllib3.connection.HTTPSConnection(
ip, port or 443,
assert_hostname = hostname,
server_hostname = hostname,
)
else:
raise BadScheme(f"Bad scheme {parsed.scheme!r} for url {url}")
conn.request("GET", url, preload_content=False)
resp = conn.getresponse()
# Redirects do not require further checks
if resp.get_redirect_location():
return resp
# 4xx errors are non-retryable, anything else is.
if 400 <= resp.status < 500:
raise FailedResponse(f"Url returned {resp.status} response: {url}")
elif not (200 <= resp.status < 300):
raise Exception(f"Url returned {resp.status} response: {url}")
content_type = resp.getheader("content-type")
if content_type is None:
raise Exception(f"No content-type given for url {url}")
if not any(content_type.startswith(target) for target in content_types):
raise WrongContent(f"Disallowed content-type {content_type} for url {url}")
# If length is known but too large, reject early
length = resp.getheader("content-length")
if length is not None:
try:
length = int(length)
except ValueError:
raise Exception(f"Invalid content length {length!r} for url {url}")
if length > max_size:
raise TooLarge(f"Content length {length} is too large for url {url}")
return resp

@ -180,6 +180,15 @@
// Extra directories (besides segments) to backfill
backfill_dirs:: ["emotes"],
// Enable saving of media (images and videos - this can be large), either globally or split into
// three options:
// - From chat messages (in chat_archiver.download_media)
// - From the image links column in the sheet (using sheetsync)
// - Backfilled from other nodes
download_media:: true,
backfill_media:: $.download_media,
download_sheet_links:: $.download_media,
// The spreadsheet id and worksheet names for sheet sync to act on
// Set to null to disable syncing from sheets.
sheet_id:: "your_id_here",
@ -220,6 +229,8 @@
token_path: "./chat_token.txt",
// Whether to enable backfilling of chat archives to this node (if backfiller enabled)
backfill: true,
// Whether to enable downloading of media (images, videos, etc) that is posted in chat.
download_media: $.download_media,
// Channels to watch. Defaults to "all twitch channels in $.channels" but you can add extras.
channels: [
std.split(c, '!')[0]
@ -365,7 +376,10 @@
[
"--base-dir", "/mnt",
"--qualities", std.join(",", $.qualities + (if $.chat_archiver.backfill then ["chat"] else [])),
"--extras", std.join(",", $.backfill_dirs),
"--extras", std.join(",",
$.backfill_dirs
+ (if $.backfill_media then ["media"] else [])
),
"--static-nodes", std.join(",", $.peers),
"--backdoor-port", std.toString($.backdoor_port),
"--node-database", $.db_connect,
@ -456,6 +470,7 @@
worksheets: $.worksheets,
edit_url: $.edit_url,
bustime_start: $.bustime_start,
download_media: $.download_sheet_links,
},
sync_sheet_base + {
name+: "sheet-playlists",
@ -481,7 +496,7 @@
event_id: $.streamlog_event,
},
local sync_streamlog = [
sync_streamlog_base + {name: "streamlog-events", type: "events"},
sync_streamlog_base + {name: "streamlog-events", type: "events", download_media: $.download_sheet_links},
sync_streamlog_base + {name: "streamlog-playlists", type: "playlists"},
],
local config = (
@ -493,11 +508,15 @@
command: [
"--backdoor-port", std.toString($.backdoor_port),
$.db_connect,
] + std.map(std.manifestJson, config),
// Mount the creds file(s) into /etc
]
+ (if $.download_sheet_links then ["--media-dir=/mnt"] else [])
+ std.map(std.manifestJson, config),
volumes: std.prune([
// Mount the creds file(s) into /etc
if $.sheet_id != null then "%s:/etc/sheet-creds.json" % $.sheet_creds_file,
if $.streamlog_url != null then "%s:/etc/streamlog-token.txt" % $.streamlog_creds_file,
// Mount the segments media directory
if $.download_sheet_links then "%s/media:/mnt" % $.segments_path,
]),
// If the application crashes, restart it.
restart: "on-failure",
@ -614,7 +633,11 @@
[if $.enabled.chat_archiver then "chat_archiver"]: {
image: $.get_image("chat_archiver"),
restart: "always",
command: [$.chat_archiver.user, "/token"] + $.chat_archiver.channels + ["--name", $.localhost],
command:
[$.chat_archiver.user, "/token"]
+ $.chat_archiver.channels
+ ["--name", $.localhost]
+ (if $.chat_archiver.download_media then ["--download-media"] else []),
volumes: ["%s:/mnt" % $.segments_path, "%s:/token" % $.chat_archiver.token_path],
[if "chat_archiver" in $.ports then "ports"]: ["%s:8008" % $.ports.chat_archiver],
environment: $.env,

@ -3,6 +3,7 @@ import json
import logging
import signal
from collections import defaultdict
from urllib.parse import urlparse
import argh
import gevent.backdoor
@ -15,6 +16,7 @@ from requests import HTTPError
import common
import common.dateutil
from common.database import DBManager, query, get_column_placeholder
from common.media import check_for_media, download_media
from common.sheets import Sheets as SheetsClient
from .sheets import SheetsEventsMiddleware, SheetsPlaylistsMiddleware, SheetsArchiveMiddleware
@ -274,6 +276,11 @@ class EventsSync(SheetSync):
"category",
}
def __init__(self, name, middleware, stop, dbmanager, reverse_sync=False, media_dir=None):
super().__init__(name, middleware, stop, dbmanager, reverse_sync)
self.media_dir = media_dir
self.media_downloads = None if media_dir is None else {}
def observe_rows(self, rows):
counts = defaultdict(lambda: 0)
for row in rows:
@ -287,6 +294,25 @@ class EventsSync(SheetSync):
def sync_row(self, sheet_row, db_row):
# Do some special-case transforms for events before syncing
# Attempt to download any URLs in the links column if we don't already have them.
# This is done asyncronously. We keep a record of failed attempts for two reasons:
# - To avoid retrying
# - To populate the errors column asyncronously
# This record is just in memory - we're ok retrying after every restart.
# You can disable downloads on a per-row basis by putting "[nodownload]" in the notes column.
if sheet_row is not None and self.media_dir is not None and "[nodownload]" not in sheet_row["notes"]:
for url in sheet_row['image_links']:
if url not in self.media_downloads:
self.media_downloads[url] = gevent.spawn(self.download_media, url)
# Greenlet.exception is populated if the greenlet failed with an exception,
# or None otherwise (success or not finished).
# We treat a failure to fetch a URL like a parse error.
e = self.media_downloads[url].exception
if e is not None:
sheet_row.setdefault("_parse_errors", []).append(
f"Failed to download media link {url:!r}: {e}"
)
if db_row is not None:
# If no database error, but we have parse errors, indicate they should be displayed.
if db_row.error is None and sheet_row is not None and sheet_row.get('_parse_errors'):
@ -300,6 +326,20 @@ class EventsSync(SheetSync):
super().sync_row(sheet_row, db_row)
def download_media(self, url):
hostname = urlparse(url).hostname
if hostname in ("youtu.be", "youtube.com"):
self.logger.info(f"Ignoring url {url:!r}: Blocklisted hostname")
if check_for_media(self.media_dir, url):
self.logger.info(f"Already have content for url {url:!r}")
return
try:
download_media(url, self.media_dir)
except Exception:
self.logger.warning(f"Failed to download url {url:!r}", exc_info=True)
raise
self.logger.info(f"Downloaded media for url {url:!r}")
class ArchiveSync(EventsSync):
# Archive events are a special case of event with less input columns.
@ -372,7 +412,7 @@ class PlaylistsSync(SheetSync):
event_id: The id of the streamlog event to sync
""",
)
def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0):
def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0, media_dir="."):
"""
Sheet sync constantly scans a Google Sheets sheet and a database, copying inputs from the sheet
to the DB and outputs from the DB to the sheet.
@ -466,7 +506,10 @@ def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0):
"playlists": PlaylistsSync,
"archive": ArchiveSync,
}[config["type"]]
sync = sync_class(config["name"], middleware, stop, dbmanager, reverse_sync)
sync_class_kwargs = {}
if config["type"] == "events" and config.get("download_media", False):
sync_class_kwargs["media_dir"] = media_dir
sync = sync_class(config["name"], middleware, stop, dbmanager, reverse_sync, **sync_class_kwargs)
workers.append(sync)
jobs = [gevent.spawn(worker.run) for worker in workers]

Loading…
Cancel
Save