From 3e738262bcdf77618a657cba4c866f2ed91de9e2 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 13 Nov 2024 17:44:34 +1100 Subject: [PATCH] move bus_synthesizer logic to thrimshim and adapt to existing api --- bus_synthesizer/bus_synthesizer/__init__.py | 0 bus_synthesizer/bus_synthesizer/__main__.py | 16 --- .../thrimshim/bus_stats.py | 96 ++++-------------- thrimshim/thrimshim/main.py | 99 +++---------------- 4 files changed, 30 insertions(+), 181 deletions(-) delete mode 100644 bus_synthesizer/bus_synthesizer/__init__.py delete mode 100644 bus_synthesizer/bus_synthesizer/__main__.py rename bus_synthesizer/bus_synthesizer/main.py => thrimshim/thrimshim/bus_stats.py (75%) diff --git a/bus_synthesizer/bus_synthesizer/__init__.py b/bus_synthesizer/bus_synthesizer/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/bus_synthesizer/bus_synthesizer/__main__.py b/bus_synthesizer/bus_synthesizer/__main__.py deleted file mode 100644 index d0e78f7..0000000 --- a/bus_synthesizer/bus_synthesizer/__main__.py +++ /dev/null @@ -1,16 +0,0 @@ -import gevent.monkey -gevent.monkey.patch_all() - -import logging -import os - -import argh - -from thrimshim.main import main - -LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s" - -level = os.environ.get('WUBLOADER_LOG_LEVEL', 'INFO').upper() -logging.basicConfig(level=level, format=LOG_FORMAT) -argh.dispatch_command(main) - diff --git a/bus_synthesizer/bus_synthesizer/main.py b/thrimshim/thrimshim/bus_stats.py similarity index 75% rename from bus_synthesizer/bus_synthesizer/main.py rename to thrimshim/thrimshim/bus_stats.py index 0002594..bfebda3 100644 --- a/bus_synthesizer/bus_synthesizer/main.py +++ b/thrimshim/thrimshim/bus_stats.py @@ -1,41 +1,13 @@ import datetime import itertools -import json import math import operator -import argh -import base64 -import flask -import gevent -import gevent.backdoor -from gevent.pywsgi import WSGIServer +import common.database -import common -from common import database -from common.flask_stats import request_stats, after_request - -app = flask.Flask('bus_synthesizer') -app.after_request(after_request) MAX_SPEED = 45 / 3600 -def cors(app): - """WSGI middleware that sets CORS headers""" - HEADERS = [ - ("Access-Control-Allow-Credentials", "false"), - ("Access-Control-Allow-Headers", "*"), - ("Access-Control-Allow-Methods", "GET,POST,HEAD"), - ("Access-Control-Allow-Origin", "*"), - ("Access-Control-Max-Age", "86400"), - ] - def handle(environ, start_response): - def _start_response(status, headers, exc_info=None): - headers += HEADERS - return start_response(status, headers, exc_info) - return app(environ, _start_response) - return handle - def post_process_miles(seconds, miles, days): good = [] @@ -169,17 +141,16 @@ def post_process_clocks(seconds, clocks, days): return corrected_clocks -@app.route('/bus_synthesizer/latest') -@request_stats -def latest(): + +def get_latest(channel, conn): ago_30_min = datetime.datetime.utcnow() - datetime.timedelta(minutes=30) - query = common.database.query(app.db_manager.get_conn(), """ + query = common.database.query(conn, """ SELECT timestamp, odometer, clock, timeofday FROM bus_data WHERE timestamp > %(start)s - --AND NOT segment LIKE '%%partial%%' + AND channel = %(channel)s ORDER BY timestamp; - """, start=ago_30_min) + """, channel=channel, start=ago_30_min) rows = query.fetchall() times, miles, clocks, days = zip(*rows) @@ -223,8 +194,9 @@ def latest(): # before 6:40 is pm if latest['clock'] < 6 * 60 + 40: is_pm = True + # after 7:00 is am elif latest['clock'] >= 7 * 60: - is_pm = True + is_pm = False else: # 6:40 to 7:00 is ambiguous; look back 21 min twenty_one = None @@ -237,11 +209,9 @@ def latest(): else: is_pm = True - hour = latest['clock'] // 60 - minute = latest['clock'] % 60 + processed_clock = latest['clock'] if is_pm: - hour += 12 - proccessed_clock = '{}:{:02d}'.format(hour, minute) + processed_clock += 12 * 60 else: proccessed_clock = None @@ -257,10 +227,7 @@ def latest(): now_second = (now - times[0]) / datetime.timedelta(seconds=1) if latest['clock']: diff = int(math.floor((now - latest['time']) / datetime.timedelta(minutes = 1))) - new_clock = hour * 60 + minute + diff - minute = new_clock % 60 - hour = (new_clock // 60) % 24 - predicted_clock = '{}:{:02d}'.format(hour, minute) + predicted_clock = processed_clock + diff else: predicted_clock = None @@ -268,41 +235,12 @@ def latest(): else: predicted = None, None, None, None - output = {'raw':tuple_to_dict(raw), - 'post_processed':tuple_to_dict(processed), - 'predicted':tuple_to_dict(predicted), - } - - return to_json(output) + return { + 'raw': tuple_to_dict(raw), + 'post_processed': tuple_to_dict(processed), + 'predicted': tuple_to_dict(predicted), + } -def tuple_to_dict(t, names=['time', 'mile', 'ToD', 'clock']): +def tuple_to_dict(t, names=['time', 'odometer', 'timeofday', 'clock_minutes']): return {names[i]:t[i] for i in range(len(t))} - - -# copied from thrimshim -def to_json(obj): - def convert(value): - if isinstance(value, datetime.datetime): - return value.isoformat() - if isinstance(value, datetime.timedelta): - return value.total_seconds() - if isinstance(value, memoryview) or isinstance(value, bytes): - return base64.b64encode(bytes(value)).decode() - raise TypeError(f"Can't convert object of type {value.__class__.__name__} to JSON: {value}") - return json.dumps(obj, default=convert) - - -def main(connection_string, host='0.0.0.0', port=8004, backdoor_port=0): - - server = WSGIServer((host, port), cors(app)) - - app.db_manager = database.DBManager(dsn=connection_string) - - common.PromLogCountsHandler.install() - common.install_stacksampler() - - if backdoor_port: - gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() - - common.serve_with_graceful_shutdown(server) diff --git a/thrimshim/thrimshim/main.py b/thrimshim/thrimshim/main.py index dd0cf51..865fc5e 100644 --- a/thrimshim/thrimshim/main.py +++ b/thrimshim/thrimshim/main.py @@ -15,13 +15,15 @@ import prometheus_client from psycopg2 import sql import common -from common import database, dateutil +from common import database from common.flask_stats import request_stats, after_request from common.segments import KNOWN_XFADE_TRANSITIONS, CUSTOM_XFADE_TRANSITIONS import google.oauth2.id_token import google.auth.transport.requests +from . import bus_stats + app = flask.Flask('thrimshim') app.after_request(after_request) @@ -777,99 +779,24 @@ def get_thumbnail(ident): return flask.Response(bytes(event['thumbnail_image']), mimetype='image/png') else: return '', 404 - @app.route('/thrimshim/bus/') @request_stats def get_odometer(channel): """Not directly thrimbletrimmer related but easiest to put here as we have DB access. - Checks DB for the most recent odometer reading as of `time` param (default now). - However, won't consider readings older than `range` param (default 1 minute) - You can also pass `extrapolate`=`true` to try to extrapolate the reading at your requested time - based on the last known time. Note it will still only look within `range` for the last known time. - If it can't find a reading, returns 0. + Checks DB for the most recent odometer reading. + Optional param type can be one of: + raw: The most recent value, unmodified + processed: The most recent value, excluding outliers and detected mistakes + predicted: processed, but extrapolated to the current time. """ - time = flask.request.args.get("time") - if time is None: - time = datetime.datetime.utcnow() - else: - time = dateutil.parse(time) - - range = int(flask.request.args.get("range", "60")) - range = datetime.timedelta(seconds=range) - - extrapolate = (flask.request.args.get("extrapolate") == "true") - + type = flask.request.args.get("type") conn = app.db_manager.get_conn() - start = time - range - end = time - - # Get newest non-errored row within time range - # Exclude obviously wrong values, in particular 7000 which 1000 is mistaken for. - results = database.query(conn, """ - SELECT timestamp, odometer - FROM bus_data - WHERE odometer IS NOT NULL - AND odometer >= 109 - AND odometer < 7000 - AND channel = %(channel)s - AND timestamp > %(start)s - AND timestamp <= %(end)s - ORDER BY timestamp DESC - LIMIT 1 - """, channel=channel, start=start, end=end) - result = results.fetchone() - if result is None: - odometer = None - elif extrapolate: - # Current extrapolate strategy is very simple: presume we're going at full speed (45mph). - SPEED = 45. / 3600 # in miles per second - delta_t = (time - result.timestamp).total_seconds() - delta_odo = delta_t * SPEED - odometer = result.odometer + delta_odo - else: - odometer = result.odometer - - results = database.query(conn, """ - SELECT timestamp, clock, timeofday - FROM bus_data - WHERE clock IS NOT NULL - AND channel = %(channel)s - AND timestamp > %(start)s - AND timestamp <= %(end)s - ORDER BY timestamp DESC - LIMIT 1 - """, channel=channel, start=start, end=end) - result = results.fetchone() - if result is None: - clock24h = None - timeofday = None - clock_face = None - else: - clock12h = result.clock - # HACK: assume null means dawn, as we can reliably detect everything else. - timeofday = result.timeofday or "dawn" - - clock24h = clock12h - if time_is_pm(conn, result.timestamp, clock12h, timeofday): - clock24h += 720 - - if extrapolate: - delta_t = (time - result.timestamp).total_seconds() - clock12h += delta_t - clock24h += delta_t - - clock12h %= 720 - clock24h %= 1440 - clock_face = "{}:{:02d}".format(clock12h // 60, int(clock12h % 60)) - - return { - "odometer": odometer, - "clock_minutes": clock24h, - "clock": clock_face, - "timeofday": timeofday, - } + values = bus_stats.get_latest(channel, conn) + if type not in values: + return f"Unknown type: {type!r}", 400 + return to_json(values[type]) def time_is_pm(conn, timestamp, clock, timeofday):