mirror of https://github.com/ekimekim/wubloader
Compare commits
9 Commits
3f6263a037
...
7483794a23
Author | SHA1 | Date |
---|---|---|
Mike Lang | 7483794a23 | 2 months ago |
Mike Lang | 3e7cb38cf0 | 2 months ago |
Mike Lang | 7b590cf574 | 2 months ago |
Mike Lang | 15f86551d4 | 2 months ago |
Mike Lang | 9dfb00f4ab | 2 months ago |
Mike Lang | 2855ec759d | 2 months ago |
Mike Lang | b46c577014 | 2 months ago |
Mike Lang | 352c9e9081 | 2 months ago |
Mike Lang | 07055e3605 | 2 months ago |
@ -0,0 +1,27 @@
|
|||||||
|
|
||||||
|
"""A group of greenlets running tasks.
|
||||||
|
Each task has a key. If a task with that key is already running,
|
||||||
|
it is not re-run."""
|
||||||
|
|
||||||
|
import gevent
|
||||||
|
|
||||||
|
|
||||||
|
class KeyedGroup:
|
||||||
|
def __init__(self):
|
||||||
|
self.greenlets = {}
|
||||||
|
|
||||||
|
def spawn(self, key, func, *args, **kwargs):
|
||||||
|
if key not in self.greenlets:
|
||||||
|
self.greenlets[key] = gevent.spawn(self._wrapper, key, func, *args, **kwargs)
|
||||||
|
return self.greenlets[key]
|
||||||
|
|
||||||
|
def wait(self):
|
||||||
|
"""Blocks until all tasks started before wait() was called are finished."""
|
||||||
|
gevent.wait(self.greenlets.values())
|
||||||
|
|
||||||
|
def _wrapper(self, key, func, *args, **kwargs):
|
||||||
|
try:
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
finally:
|
||||||
|
assert self.greenlets[key] is gevent.getcurrent()
|
||||||
|
del self.greenlets[key]
|
@ -0,0 +1,247 @@
|
|||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import socket
|
||||||
|
import time
|
||||||
|
import urllib.parse
|
||||||
|
from base64 import b64encode
|
||||||
|
from hashlib import sha256
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
|
import gevent
|
||||||
|
import prometheus_client as prom
|
||||||
|
import urllib3.connection
|
||||||
|
from ipaddress import ip_address
|
||||||
|
|
||||||
|
from . import atomic_write, ensure_directory, jitter, listdir
|
||||||
|
from .stats import timed
|
||||||
|
|
||||||
|
|
||||||
|
media_bytes_downloaded = prom.Counter(
|
||||||
|
"media_bytes_downloaded",
|
||||||
|
"Number of bytes of media files downloaded. Includes data downloaded then later rejected.",
|
||||||
|
)
|
||||||
|
|
||||||
|
media_bytes_saved = prom.Histogram(
|
||||||
|
"media_bytes_saved",
|
||||||
|
"Size in bytes of downloaded media that was successfully saved",
|
||||||
|
["content_type"],
|
||||||
|
buckets = [2**n for n in range(11, 27, 2)],
|
||||||
|
)
|
||||||
|
|
||||||
|
media_already_exists = prom.Counter(
|
||||||
|
"media_already_exists",
|
||||||
|
"Count of times we downloaded a file but it already existed",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class Rejected(Exception):
|
||||||
|
"""Indicates a non-retryable failure due to the url response violating our constraints"""
|
||||||
|
|
||||||
|
class TooLarge(Rejected):
|
||||||
|
"""Response was too large"""
|
||||||
|
|
||||||
|
class ForbiddenDestination(Rejected):
|
||||||
|
"""Hostname resolved to non-global IP"""
|
||||||
|
|
||||||
|
class BadScheme(Rejected):
|
||||||
|
"""Bad url scheme"""
|
||||||
|
|
||||||
|
class WrongContent(Rejected):
|
||||||
|
"""Response was not a video or image"""
|
||||||
|
|
||||||
|
class FailedResponse(Rejected):
|
||||||
|
"""Got a 4xx response, probably a bad link"""
|
||||||
|
|
||||||
|
|
||||||
|
def check_for_media(output_dir, url):
|
||||||
|
"""Returns True if we have at least one version of content for the given url already."""
|
||||||
|
url_dir = get_url_dir(output_dir, url)
|
||||||
|
return any(filename.endswith(".metadata.json") for filename in listdir(url_dir))
|
||||||
|
|
||||||
|
|
||||||
|
@timed()
|
||||||
|
def download_media(
|
||||||
|
url,
|
||||||
|
output_dir,
|
||||||
|
max_size=128*2**20, # 128MiB
|
||||||
|
timeout=60,
|
||||||
|
content_types=("image", "video", "application/pdf"),
|
||||||
|
max_redirects=5,
|
||||||
|
retries=3,
|
||||||
|
retry_interval=1,
|
||||||
|
chunk_size=64*1024, # 64KiB
|
||||||
|
):
|
||||||
|
"""Make a GET request to a potentially malicious URL and download the content to file.
|
||||||
|
We check the following:
|
||||||
|
- That the host is a public IP
|
||||||
|
- That the response does not exceed given max size (default 128MB)
|
||||||
|
- That the content type is in the given list
|
||||||
|
(the list may contain exact types like "image/png" or categories like "image")
|
||||||
|
- That the whole thing doesn't take more than a timeout
|
||||||
|
Redirects *will* be followed but the follow-up requests must obey the same rules
|
||||||
|
(and do not reset the timeout).
|
||||||
|
|
||||||
|
We save the file to OUTPUT_DIR/URL_HASH/FILE_HASH.EXT where EXT is gussed from content-type.
|
||||||
|
We save additional metadata including the url and content type to OUTPUT_DIR/URL_HASH/FILE_HASH.metadata.json
|
||||||
|
|
||||||
|
Raises on any rule violation or non-200 response.
|
||||||
|
"""
|
||||||
|
# Stores a list of urls redirected to, latest is current.
|
||||||
|
urls = [url]
|
||||||
|
|
||||||
|
with gevent.Timeout(timeout):
|
||||||
|
for redirect_number in range(max_redirects):
|
||||||
|
errors = []
|
||||||
|
for retry in range(retries):
|
||||||
|
if retry > 0:
|
||||||
|
gevent.sleep(jitter(retry_interval))
|
||||||
|
|
||||||
|
try:
|
||||||
|
resp = _request(urls[-1], max_size, content_types)
|
||||||
|
|
||||||
|
new_url = resp.get_redirect_location()
|
||||||
|
if new_url:
|
||||||
|
urls.append(new_url)
|
||||||
|
break # break from retry loop, continuing in the redirect loop
|
||||||
|
|
||||||
|
_save_response(output_dir, urls, resp, max_size, chunk_size)
|
||||||
|
return
|
||||||
|
except Rejected:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
errors.append(e)
|
||||||
|
# fall through to next retry loop
|
||||||
|
else:
|
||||||
|
# This block will be reached if range(retries) runs out but not via "break"
|
||||||
|
raise ExceptionGroup(f"All retries failed for url {urls[-1]}", errors)
|
||||||
|
|
||||||
|
raise Exception("Too many redirects")
|
||||||
|
|
||||||
|
|
||||||
|
def hash_to_path(hash):
|
||||||
|
return b64encode(hash.digest(), b"-_").decode().rstrip("=")
|
||||||
|
|
||||||
|
|
||||||
|
def get_url_dir(output_dir, url):
|
||||||
|
return os.path.join(output_dir, hash_to_path(sha256(url.encode())))
|
||||||
|
|
||||||
|
|
||||||
|
def _save_response(output_dir, urls, resp, max_size, chunk_size):
|
||||||
|
url_dir = get_url_dir(output_dir, urls[0])
|
||||||
|
temp_path = os.path.join(url_dir, f".{uuid4()}.temp")
|
||||||
|
ensure_directory(temp_path)
|
||||||
|
|
||||||
|
content_type = resp.headers["content-type"]
|
||||||
|
# Content type may have form "TYPE ; PARAMS", strip params if present.
|
||||||
|
# Also normalize for whitespace and case.
|
||||||
|
content_type = content_type.split(";")[0].strip().lower()
|
||||||
|
# We attempt to convert content type to an extension by taking the second part
|
||||||
|
# and stripping anything past the first character not in [a-z0-9-].
|
||||||
|
# So eg. "image/png" -> "png", "image/svg+xml" -> "svg", "image/../../../etc/password" -> ""
|
||||||
|
ext = content_type.split("/")[-1]
|
||||||
|
ext = re.match(r"^[a-z0-9.-]*", ext).group(0)
|
||||||
|
|
||||||
|
try:
|
||||||
|
length = 0
|
||||||
|
hash = sha256()
|
||||||
|
with open(temp_path, "wb") as f:
|
||||||
|
while True:
|
||||||
|
chunk = resp.read(chunk_size)
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
hash.update(chunk)
|
||||||
|
length += len(chunk)
|
||||||
|
media_bytes_downloaded.inc(len(chunk))
|
||||||
|
if length > max_size:
|
||||||
|
raise TooLarge(f"Read more than {length} bytes from url {urls[-1]}")
|
||||||
|
f.write(chunk)
|
||||||
|
|
||||||
|
filename = f"{hash_to_path(hash)}.{ext}"
|
||||||
|
filepath = os.path.join(url_dir, filename)
|
||||||
|
# This is vulnerable to a race where two things create the file at once,
|
||||||
|
# but that's fine since it will always have the same content. This is just an optimization
|
||||||
|
# to avoid replacing the file over and over (and for observability)
|
||||||
|
if os.path.exists(filepath):
|
||||||
|
logging.info(f"Discarding downloaded file for {urls[0]} as it already exists")
|
||||||
|
media_already_exists.inc()
|
||||||
|
else:
|
||||||
|
os.rename(temp_path, filepath)
|
||||||
|
logging.info(f"Downloaded file for {urls[0]}")
|
||||||
|
media_bytes_saved.labels(content_type).observe(length)
|
||||||
|
finally:
|
||||||
|
if os.path.exists(temp_path):
|
||||||
|
os.remove(temp_path)
|
||||||
|
|
||||||
|
metadata_path = os.path.join(url_dir, f"{hash_to_path(hash)}.metadata.json")
|
||||||
|
# Again, this is racy but we don't care about double-writes.
|
||||||
|
# Note it's entirely possible for the image to already exist but still write the metadata,
|
||||||
|
# this can happen if a previous attempt crashed midway.
|
||||||
|
if not os.path.exists(metadata_path):
|
||||||
|
metadata = {
|
||||||
|
"url": urls[0],
|
||||||
|
"filename": filename,
|
||||||
|
"redirects": urls[1:],
|
||||||
|
"content_type": resp.headers["content-type"],
|
||||||
|
"fetched_by": socket.gethostname(),
|
||||||
|
"fetch_time": time.time(),
|
||||||
|
}
|
||||||
|
atomic_write(metadata_path, json.dumps(metadata, indent=4))
|
||||||
|
|
||||||
|
|
||||||
|
def _request(url, max_size, content_types):
|
||||||
|
"""Do the actual request and return a vetted response object, which is either the content
|
||||||
|
(status 200) or a redirect.
|
||||||
|
Raises Rejected if content fails checks, anything else should be considered retryable."""
|
||||||
|
parsed = urllib.parse.urlparse(url)
|
||||||
|
hostname = parsed.hostname
|
||||||
|
port = parsed.port
|
||||||
|
|
||||||
|
ip = socket.gethostbyname(hostname)
|
||||||
|
if not ip_address(ip).is_global:
|
||||||
|
raise ForbiddenDestination(f"Non-global IP {ip} for url {url}")
|
||||||
|
|
||||||
|
# In order to provide the host/ip to connect to seperately from the URL,
|
||||||
|
# we need to drop to a fairly low-level interface.
|
||||||
|
if parsed.scheme == "http":
|
||||||
|
conn = urllib3.connection.HTTPConnection(ip, port or 80)
|
||||||
|
elif parsed.scheme == "https":
|
||||||
|
conn = urllib3.connection.HTTPSConnection(
|
||||||
|
ip, port or 443,
|
||||||
|
assert_hostname = hostname,
|
||||||
|
server_hostname = hostname,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise BadScheme(f"Bad scheme {parsed.scheme!r} for url {url}")
|
||||||
|
|
||||||
|
conn.request("GET", url, preload_content=False)
|
||||||
|
resp = conn.getresponse()
|
||||||
|
|
||||||
|
# Redirects do not require further checks
|
||||||
|
if resp.get_redirect_location():
|
||||||
|
return resp
|
||||||
|
|
||||||
|
# 4xx errors are non-retryable, anything else is.
|
||||||
|
if 400 <= resp.status < 500:
|
||||||
|
raise FailedResponse(f"Url returned {resp.status} response: {url}")
|
||||||
|
elif not (200 <= resp.status < 300):
|
||||||
|
raise Exception(f"Url returned {resp.status} response: {url}")
|
||||||
|
|
||||||
|
content_type = resp.getheader("content-type")
|
||||||
|
if content_type is None:
|
||||||
|
raise Exception(f"No content-type given for url {url}")
|
||||||
|
if not any(content_type.startswith(target) for target in content_types):
|
||||||
|
raise WrongContent(f"Disallowed content-type {content_type} for url {url}")
|
||||||
|
|
||||||
|
# If length is known but too large, reject early
|
||||||
|
length = resp.getheader("content-length")
|
||||||
|
if length is not None:
|
||||||
|
try:
|
||||||
|
length = int(length)
|
||||||
|
except ValueError:
|
||||||
|
raise Exception(f"Invalid content length {length!r} for url {url}")
|
||||||
|
if length > max_size:
|
||||||
|
raise TooLarge(f"Content length {length} is too large for url {url}")
|
||||||
|
|
||||||
|
return resp
|
Loading…
Reference in New Issue