From 6c25acfaec2cf694c71dd65c40c31ef6e30a9767 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Sun, 8 Jun 2025 17:26:28 +1000 Subject: [PATCH] encode worker --- encode-worker.sh | 159 ++++++++++++++++++++++++++++++++++++++++++++ postgres/schema.sql | 21 ++++++ postgres/setup.sh | 12 ++++ 3 files changed, 192 insertions(+) create mode 100755 encode-worker.sh diff --git a/encode-worker.sh b/encode-worker.sh new file mode 100755 index 0000000..d9628a1 --- /dev/null +++ b/encode-worker.sh @@ -0,0 +1,159 @@ +#!/bin/bash + +set -eu + +if [ "$#" -lt 2 ]; then + echo "USAGE: $0 NAME CONNINFO" + echo "NAME should be a unique name for your node" + echo "CONNINFO should be a postgres connection url like postgresql://USER:PASS@HOSTNAME/DATABASE" + exit 1 +fi + +NAME="$1" +CONNINFO="$2" +WORKDIR=${WORKDIR:-.} + +logcmd() { + echo "Running: $*" >&2 + "$@" +} + +db() { + psql -Atqbv ON_ERROR_STOP=on "$CONNINFO" "$@" +} + +# Expects a url matching "scp://USER:PASS@HOST:PORT/PATH" +# Returns USER PASS HOST PORT PATH, assumes all but path contain no whitespace. Assumes no URL-encoded chars. +url_to_parts() { + parts=$(sed -E 's|scp://([^:@]+):([^@]+)@([^:]+):([0-9]+)/(.+)|\1 \2 \3 \4 \5|' <<<"$1") + if [ "$parts" == "$1" ]; then # no substitution + echo "Could not parse URL: $1" >&2 + return 1 + fi + echo "$parts" +} + +url_to_filename() { + local user pass host port path name + parts=$(url_to_parts "$1") + read -r user pass host port path <<<"$parts" + name=$(basename "$path") + echo "$WORKDIR/$name" +} + +download_file() { + local user pass host port path file + parts=$(url_to_parts "$1") + read -r user pass host port path <<<"$parts" + file=$(url_to_filename "$1") + logcmd sshpass -p "$pass" scp -P "$port" "$user@$host:$path" "$file" +} + +upload_file() { + local user pass host port path file + parts=$(url_to_parts "$1") + read -r user pass host port path <<<"$parts" + file=$(url_to_filename "$1") + logcmd sshpass -p "$pass" scp -P "$port" "$file" "$user@$host:$path" +} + +encode() { + local src dest args + src="$1" + dest="$2" + shift 2 + args=() + for arg in "$@"; do + sub=$(sed "s|{SRC_FILE}|$src|g; s|{DEST_FILE}|$dest|g" <<<"$arg") + args+=("$sub") + done + logcmd ffmpeg -hide_banner -nostdin -y "${args[@]}" +} + +existing=$( + db -v name="$NAME" <<-SQL + SELECT claimed_at, dest_url FROM encodes + WHERE claimed_by = :'name' + SQL +) +if [ -n "$existing" ]; then + echo "WARNING: The following files are already claimed by this node:" + echo "$existing" + echo + echo -n "This is likely due to a crash. Unclaim these files? [Y/n] > " + read -r resp + if [ "$resp" != "n" ]; then + db -v name="$NAME" <<-SQL + UPDATE encodes SET + claimed_by = NULL, + claimed_at = NULL + WHERE claimed_by = :'name' + SQL + fi +fi + +while true; do + echo "Checking for jobs" + claimed=$( + db -F ' ' -v name="$NAME" <<-SQL + UPDATE encodes SET + claimed_by = :'name', + claimed_at = now() + WHERE dest_url = ( + SELECT dest_url FROM encodes + WHERE claimed_by IS NULL + LIMIT 1 + ) + RETURNING src_url, src_hash, dest_url, dest_hash + SQL + ) + if [ -z "$claimed" ]; then + echo "No available jobs, will check again in 1min" + sleep 60 + continue + fi + + read -r src_url src_hash dest_url dest_hash <<<"$claimed" + src_file=$(url_to_filename "$src_url") + dest_file=$(url_to_filename "$dest_url") + echo "Got task to encode $dest_file" + # Read encode args seperately as we need to split out the array. + # The following query outputs one row per arg, seperated by nul chars. + # readarray -d '' will read into the given array after splitting on nul chars. + readarray -td '' encode_args < <( + db -0 -v dest_url="$dest_url" <<-SQL + SELECT unnest(encode_args) FROM encodes + WHERE dest_url = :'dest_url' + SQL + ) + if [ -f "$src_file" ]; then + if sha256sum --status -c - <<<"$src_hash $src_file"; then + echo "Source file already exists - skipping download" + else + echo "Existing source file does not match hash - assuming corrupt and re-downloading." + rm "$src_file" + fi + fi + if ! [ -f "$src_file" ]; then + echo "Downloading source file (no progress bar sorry, blame scp)" + download_file "$src_url" + echo "Checking source file checksum" + sha256sum --status -c - <<<"$src_hash $src_file" + fi + echo "Starting encode" + encode "$src_file" "$dest_file" "${encode_args[@]}" + echo "Encode complete, uploading output file (still no progress bar)" + upload_file "$dest_url" + echo "Calculating output hash and marking complete" + dest_hash=$(sha256sum "$dest_file" | cut -d' ' -f1) + # Re-setting claimed_by *should* be a no-op here but if something has + # gone wrong at least we'll know which node is writing. + db -v dest_url="$dest_url" -v dest_hash="$dest_hash" -v name="$NAME" <<-SQL + UPDATE encodes SET + dest_hash = :'dest_hash', + claimed_by = :'name', + finished_at = now() + WHERE dest_url = :'dest_url' + SQL + +done diff --git a/postgres/schema.sql b/postgres/schema.sql index ae0185b..43fd33b 100644 --- a/postgres/schema.sql +++ b/postgres/schema.sql @@ -195,3 +195,24 @@ CREATE TABLE templates ( crop box_coords NOT NULL, location box_coords NOT NULL ); + +-- Used to farm out encoding jobs to encoder workers. +-- URL fields must match form: "scp://USER:PASS@HOST:PORT/PATH" +-- Hash fields are hex strings containing sha256 hashes. +-- encode_args should be passed verbatim to ffmpeg with the following substitutions: +-- {SRC_FILE}: The path to the source file +-- {DEST_FILE}: The path to the output file +-- Example encode args: '-i' '{SRC_FILE}' '-c' 'copy' '{DEST_FILE}' +-- The job is considered complete once the dest hash is written. +-- Jobs may be claimed by writing a worker name to claimed_by. +-- Timestamp fields are indicative only. +CREATE TABLE encodes ( + src_url TEXT NOT NULL, + src_hash TEXT NOT NULL, + encode_args TEXT[] NOT NULL, + dest_url TEXT PRIMARY KEY, + dest_hash TEXT, + claimed_by TEXT, + claimed_at TIMESTAMP, + finished_at TIMESTAMP +); diff --git a/postgres/setup.sh b/postgres/setup.sh index 7c83644..a4d6396 100644 --- a/postgres/setup.sh +++ b/postgres/setup.sh @@ -80,3 +80,15 @@ if [ -n "$BUSCRIBE_USER" ]; then echo "Applying schema for $BUSCRIBE_DB" sql "$BUSCRIBE_USER" -d "$BUSCRIBE_DB" < /buscribe.sql fi + +if [ -n "$ENCODER_USER" ]; then + echo "Creating $ENCODER_USER" + echo "host all $ENCODER_USER all md5" >> "$PGDATA/pg_hba.conf" + sql "$POSTGRES_USER" <<-EOSQL + CREATE USER $ENCODER_USER WITH CONNECTION LIMIT 50 LOGIN PASSWORD '$ENCODER_PASSWORD'; + GRANT CONNECT ON DATABASE $POSTGRES_DB TO $ENCODER_USER; + GRANT USAGE ON SCHEMA public TO $ENCODER_USER; + GRANT SELECT ON TABLE encodes TO $ENCODER_USER; + GRANT UPDATE ( dest_hash, claimed_by, claimed_at, finished_at ) ON TABLE encodes TO $ENCODER_USER; + EOSQL +fi