From 76daceff19dab6dbb42f9dbfa36fa6db1033bb02 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Fri, 10 Nov 2023 10:24:26 +1100 Subject: [PATCH] bus_analyzer: initial implementation --- .github/workflows/main.yml | 1 + bus_analyzer/Dockerfile | 14 ++ bus_analyzer/bus_analyzer/__init__.py | 0 bus_analyzer/bus_analyzer/__main__.py | 16 +++ bus_analyzer/bus_analyzer/extract.py | 3 + bus_analyzer/bus_analyzer/main.py | 177 ++++++++++++++++++++++++++ bus_analyzer/setup.py | 18 +++ postgres/setup.sh | 13 +- 8 files changed, 237 insertions(+), 5 deletions(-) create mode 100644 bus_analyzer/Dockerfile create mode 100644 bus_analyzer/bus_analyzer/__init__.py create mode 100644 bus_analyzer/bus_analyzer/__main__.py create mode 100644 bus_analyzer/bus_analyzer/extract.py create mode 100644 bus_analyzer/bus_analyzer/main.py create mode 100644 bus_analyzer/setup.py diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 2b8a46a..8c8eb06 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -36,6 +36,7 @@ jobs: - playlist_manager - chat_archiver - zulip_bots + - bus_analyzer steps: - name: Check out repo uses: actions/checkout@v2 diff --git a/bus_analyzer/Dockerfile b/bus_analyzer/Dockerfile new file mode 100644 index 0000000..9f643b9 --- /dev/null +++ b/bus_analyzer/Dockerfile @@ -0,0 +1,14 @@ +FROM alpine:3.14 +# dependencies needed for compiling c extensions +# also busybox-extras for telnet for easier use of backdoor +RUN apk --update add py3-pip g++ python3-dev libffi-dev musl-dev file make busybox-extras + +# Install gevent so that we don't need to re-install it when common changes +RUN pip install gevent + +# Actual application +COPY bus_analyzer /tmp/bus_analyzer +RUN pip install /tmp/bus_analyzer && rm -r /tmp/bus_analyzer + +LABEL org.opencontainers.image.source https://github.com/dbvideostriketeam/wubloader +ENTRYPOINT ["python3", "-m", "bus_analyzer"] diff --git a/bus_analyzer/bus_analyzer/__init__.py b/bus_analyzer/bus_analyzer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bus_analyzer/bus_analyzer/__main__.py b/bus_analyzer/bus_analyzer/__main__.py new file mode 100644 index 0000000..c722e54 --- /dev/null +++ b/bus_analyzer/bus_analyzer/__main__.py @@ -0,0 +1,16 @@ + +import gevent.monkey +gevent.monkey.patch_all() + +import logging +import os + +import argh + +from .main import cli + +LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s" + +level = os.environ.get('WUBLOADER_LOG_LEVEL', 'INFO').upper() +logging.basicConfig(level=level, format=LOG_FORMAT) +cli() diff --git a/bus_analyzer/bus_analyzer/extract.py b/bus_analyzer/bus_analyzer/extract.py new file mode 100644 index 0000000..b5a89e5 --- /dev/null +++ b/bus_analyzer/bus_analyzer/extract.py @@ -0,0 +1,3 @@ + +def extract_segment(path): + raise NotImplementedError diff --git a/bus_analyzer/bus_analyzer/main.py b/bus_analyzer/bus_analyzer/main.py new file mode 100644 index 0000000..0f2a509 --- /dev/null +++ b/bus_analyzer/bus_analyzer/main.py @@ -0,0 +1,177 @@ + +import datetime +import logging +import os +import signal +import traceback + +import argh +import gevent.event + +from common import database +from common.segments import parse_segment_path + +from .extract import extract_segment + + +cli = argh.EntryPoint() + + +@cli +@argh.named("extract-segment") +def do_extract_segment(*segment_paths): + """Extract info from individual segments and print them""" + for segment_path in segment_paths: + odometer = extract_segment(segment_path) + print(f"{segment_path} {odometer}") + + +@cli +@argh.named("analyze-segment") +def do_analyze_segment(dbconnect, *segment_paths, base_dir='.'): + """Analyze individual segments and write them to the database""" + dbmanager = database.DBManager(dsn=dbconnect) + conn = dbmanager.get_conn() + + for segment_path in segment_paths: + analyze_segment(conn, segment_path) + + +def analyze_segment(conn, segment_path, check_segment_name=None): + segment_info = parse_segment_path(segment_path) + segment_name = '/'.join(segment_path.split('/')[-4:]) # just keep last 4 path parts + if check_segment_name is not None: + assert segment_name == check_segment_name + + try: + odometer = extract_segment(segment_path) + except Exception: + logging.warning(f"Failed to extract segment {segment_path!r}", exc_info=True) + odometer = None + error = traceback.format_exc() + else: + logging.info(f"Got odometer = {odometer} for segment {segment_path!r}") + error = None + + database.query( + conn, + """ + INSERT INTO bus_data (channel, timestamp, segment, error, odometer) + VALUES (%(channel)s, %(timestamp)s, %(segment)s, %(error)s, %(odometer)s) + ON CONFLICT (channel, timestamp, segment) DO UPDATE + SET error = %(error)s, + odometer = %(odometer)s + """, + channel=segment_info.channel, + timestamp=segment_info.start, + segment=segment_name, + error=error, + odometer=odometer, + ) + + +def analyze_hour(conn, existing_segments, base_dir, channel, quality, hour): + hour_path = os.path.join(base_dir, channel, quality, hour) + try: + segments = os.listdir(hour_path) + except FileNotFoundError: + logging.info(f"No such hour {hour_path!r}, skipping") + return + + logging.info("Found {} segments for hour {!r}".format(len(segments), hour_path)) + segments_to_do = [] + for segment in segments: + # Format as relative path from basedir, this is the format the DB expects. + segment_name = os.path.join(channel, quality, hour, segment) + if segment_name in existing_segments: + continue + + segment_path = os.path.join(base_dir, segment_name) + assert segment_path == os.path.join(hour_path, segment) + + segments_to_do.append((segment_path, segment_name)) + + logging.info("Found {} segments not already existing".format(len(segments_to_do))) + for segment_path, segment_name in segments_to_do: + analyze_segment(conn, segment_path, segment_name) + + +def parse_hours(s): + try: + return int(s) + except ValueError: + return s.split(",") + + +@cli +@argh.arg("--hours", type=parse_hours, help="If integer, watch the most recent N hours. Otherwise, comma-seperated list of hours.") +def main( + dbconnect, + *channels, + base_dir='.', + quality='source', + hours=2, + run_once=False, + overwrite=False, +): + CHECK_INTERVAL = 2 + + stopping = gevent.event.Event() + + gevent.signal_handler(signal.SIGTERM, stopping.set) + + db_manager = database.DBManager(dsn=dbconnect) + conn = db_manager.get_conn() + + logging.info("Started") + + while not stopping.is_set(): + start_time = datetime.datetime.utcnow() + + # If we aren't using a hard-coded hours list, work out hours based on current time + if isinstance(hours, int): + do_hours = [ + (start_time - datetime.timedelta(hours=hours_ago)).strftime("%Y-%m-%dT%H") + for hours_ago in range(hours) + ] + else: + do_hours = hours + + # Unless we're overwriting, fetch a list of existing segments from the database. + # We can optimize a little here by restricting to the channels and hour range we need. + if overwrite: + existing_segments = set() + else: + start = datetime.datetime.strptime(min(do_hours), "%Y-%m-%dT%H") + end = datetime.datetime.strptime(max(do_hours), "%Y-%m-%dT%H") + logging.info("Fetching existing segments from {} to {} for {}".format( + start, + end, + ", ".join(channels), + )) + result = database.query(conn, """ + SELECT segment + FROM bus_data + WHERE channel IN %(channels)s + AND timestamp >= %(start)s::timestamp + AND timestamp < %(end)s::timestamp + interval '1 hour' + AND segment IS NOT NULL + """, channels=channels, start=start, end=end) + existing_segments = {segment for (segment,) in result.fetchall()} + logging.info("Found {} existing segments".format(len(existing_segments))) + + for channel in channels: + for hour in do_hours: + analyze_hour(conn, existing_segments, base_dir, channel, quality, hour) + + if run_once: + logging.info("Requested to only run once, stopping") + return + + elapsed = (datetime.datetime.utcnow() - start_time).total_seconds() + remaining = CHECK_INTERVAL - elapsed + if remaining > 0: + logging.info(f"Sleeping {remaining} until next check") + stopping.wait(remaining) + + logging.info("Gracefully stopped") diff --git a/bus_analyzer/setup.py b/bus_analyzer/setup.py new file mode 100644 index 0000000..a9101bd --- /dev/null +++ b/bus_analyzer/setup.py @@ -0,0 +1,18 @@ +from setuptools import setup, find_packages + +setup( + name='bus_analyzer', + version='0.0.1', + author='DB Video Strike Team', + author_email='dbvideostriketeam@gmail.com', + description='', + packages=find_packages(), + install_requires=[ + "argh==0.28.1", + "gevent", + "psycogreen", + "psycopg2", + "python-dateutil", + "wubloader-common", + ], +) diff --git a/postgres/setup.sh b/postgres/setup.sh index 793e0b2..ab31c08 100644 --- a/postgres/setup.sh +++ b/postgres/setup.sh @@ -156,17 +156,20 @@ CREATE TABLE playlists ( -- The "error" column records a free-form human readable message about why a value could not -- be determined. -- The odometer column is in miles. The game shows the odometer to the 1/10th mile precision. +-- The segment may be NULL, which indicates a manually-inserted value. +-- The primary key serves two purposes: +-- It provides an index on channel, followed by a range index on timestamp +-- It provides a unique constraint on the same segment and timestamp +-- Note that multiple manual records may exist for the same channel and timestamp +-- as all NULL values are considered distinct, so the unique constraint does not hold. CREATE TABLE bus_data ( - timestamp TIMESTAMP NOT NULL, channel TEXT NOT NULL, + timestamp TIMESTAMP NOT NULL, segment TEXT, error TEXT, odometer DOUBLE PRECISION, + PRIMARY KEY (channel, timestamp, segment) ); - --- Range index on timestamp as we will often want the closest timestamp to a requested point. --- Note btree is the default anyway but we use it explicitly here as we want the range behaviour. -CREATE INDEX bus_data_timestamp ON bus_data USING btree (timestamp); EOSQL if [ -a /mnt/wubloader/nodes.csv ]; then