move bus_synthesizer logic to thrimshim and adapt to existing api

bus-post-processing
Mike Lang 2 weeks ago
parent b1e0b45b0b
commit 3e738262bc

@ -1,16 +0,0 @@
import gevent.monkey
gevent.monkey.patch_all()
import logging
import os
import argh
from thrimshim.main import main
LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s"
level = os.environ.get('WUBLOADER_LOG_LEVEL', 'INFO').upper()
logging.basicConfig(level=level, format=LOG_FORMAT)
argh.dispatch_command(main)

@ -1,41 +1,13 @@
import datetime
import itertools
import json
import math
import operator
import argh
import base64
import flask
import gevent
import gevent.backdoor
from gevent.pywsgi import WSGIServer
import common.database
import common
from common import database
from common.flask_stats import request_stats, after_request
app = flask.Flask('bus_synthesizer')
app.after_request(after_request)
MAX_SPEED = 45 / 3600
def cors(app):
"""WSGI middleware that sets CORS headers"""
HEADERS = [
("Access-Control-Allow-Credentials", "false"),
("Access-Control-Allow-Headers", "*"),
("Access-Control-Allow-Methods", "GET,POST,HEAD"),
("Access-Control-Allow-Origin", "*"),
("Access-Control-Max-Age", "86400"),
]
def handle(environ, start_response):
def _start_response(status, headers, exc_info=None):
headers += HEADERS
return start_response(status, headers, exc_info)
return app(environ, _start_response)
return handle
def post_process_miles(seconds, miles, days):
good = []
@ -169,17 +141,16 @@ def post_process_clocks(seconds, clocks, days):
return corrected_clocks
@app.route('/bus_synthesizer/latest')
@request_stats
def latest():
def get_latest(channel, conn):
ago_30_min = datetime.datetime.utcnow() - datetime.timedelta(minutes=30)
query = common.database.query(app.db_manager.get_conn(), """
query = common.database.query(conn, """
SELECT timestamp, odometer, clock, timeofday
FROM bus_data
WHERE timestamp > %(start)s
--AND NOT segment LIKE '%%partial%%'
AND channel = %(channel)s
ORDER BY timestamp;
""", start=ago_30_min)
""", channel=channel, start=ago_30_min)
rows = query.fetchall()
times, miles, clocks, days = zip(*rows)
@ -223,8 +194,9 @@ def latest():
# before 6:40 is pm
if latest['clock'] < 6 * 60 + 40:
is_pm = True
# after 7:00 is am
elif latest['clock'] >= 7 * 60:
is_pm = True
is_pm = False
else:
# 6:40 to 7:00 is ambiguous; look back 21 min
twenty_one = None
@ -237,11 +209,9 @@ def latest():
else:
is_pm = True
hour = latest['clock'] // 60
minute = latest['clock'] % 60
processed_clock = latest['clock']
if is_pm:
hour += 12
proccessed_clock = '{}:{:02d}'.format(hour, minute)
processed_clock += 12 * 60
else:
proccessed_clock = None
@ -257,10 +227,7 @@ def latest():
now_second = (now - times[0]) / datetime.timedelta(seconds=1)
if latest['clock']:
diff = int(math.floor((now - latest['time']) / datetime.timedelta(minutes = 1)))
new_clock = hour * 60 + minute + diff
minute = new_clock % 60
hour = (new_clock // 60) % 24
predicted_clock = '{}:{:02d}'.format(hour, minute)
predicted_clock = processed_clock + diff
else:
predicted_clock = None
@ -268,41 +235,12 @@ def latest():
else:
predicted = None, None, None, None
output = {'raw':tuple_to_dict(raw),
'post_processed':tuple_to_dict(processed),
'predicted':tuple_to_dict(predicted),
}
return to_json(output)
return {
'raw': tuple_to_dict(raw),
'post_processed': tuple_to_dict(processed),
'predicted': tuple_to_dict(predicted),
}
def tuple_to_dict(t, names=['time', 'mile', 'ToD', 'clock']):
def tuple_to_dict(t, names=['time', 'odometer', 'timeofday', 'clock_minutes']):
return {names[i]:t[i] for i in range(len(t))}
# copied from thrimshim
def to_json(obj):
def convert(value):
if isinstance(value, datetime.datetime):
return value.isoformat()
if isinstance(value, datetime.timedelta):
return value.total_seconds()
if isinstance(value, memoryview) or isinstance(value, bytes):
return base64.b64encode(bytes(value)).decode()
raise TypeError(f"Can't convert object of type {value.__class__.__name__} to JSON: {value}")
return json.dumps(obj, default=convert)
def main(connection_string, host='0.0.0.0', port=8004, backdoor_port=0):
server = WSGIServer((host, port), cors(app))
app.db_manager = database.DBManager(dsn=connection_string)
common.PromLogCountsHandler.install()
common.install_stacksampler()
if backdoor_port:
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
common.serve_with_graceful_shutdown(server)

@ -15,13 +15,15 @@ import prometheus_client
from psycopg2 import sql
import common
from common import database, dateutil
from common import database
from common.flask_stats import request_stats, after_request
from common.segments import KNOWN_XFADE_TRANSITIONS, CUSTOM_XFADE_TRANSITIONS
import google.oauth2.id_token
import google.auth.transport.requests
from . import bus_stats
app = flask.Flask('thrimshim')
app.after_request(after_request)
@ -777,99 +779,24 @@ def get_thumbnail(ident):
return flask.Response(bytes(event['thumbnail_image']), mimetype='image/png')
else:
return '', 404
@app.route('/thrimshim/bus/<channel>')
@request_stats
def get_odometer(channel):
"""Not directly thrimbletrimmer related but easiest to put here as we have DB access.
Checks DB for the most recent odometer reading as of `time` param (default now).
However, won't consider readings older than `range` param (default 1 minute)
You can also pass `extrapolate`=`true` to try to extrapolate the reading at your requested time
based on the last known time. Note it will still only look within `range` for the last known time.
If it can't find a reading, returns 0.
Checks DB for the most recent odometer reading.
Optional param type can be one of:
raw: The most recent value, unmodified
processed: The most recent value, excluding outliers and detected mistakes
predicted: processed, but extrapolated to the current time.
"""
time = flask.request.args.get("time")
if time is None:
time = datetime.datetime.utcnow()
else:
time = dateutil.parse(time)
range = int(flask.request.args.get("range", "60"))
range = datetime.timedelta(seconds=range)
extrapolate = (flask.request.args.get("extrapolate") == "true")
type = flask.request.args.get("type")
conn = app.db_manager.get_conn()
start = time - range
end = time
# Get newest non-errored row within time range
# Exclude obviously wrong values, in particular 7000 which 1000 is mistaken for.
results = database.query(conn, """
SELECT timestamp, odometer
FROM bus_data
WHERE odometer IS NOT NULL
AND odometer >= 109
AND odometer < 7000
AND channel = %(channel)s
AND timestamp > %(start)s
AND timestamp <= %(end)s
ORDER BY timestamp DESC
LIMIT 1
""", channel=channel, start=start, end=end)
result = results.fetchone()
if result is None:
odometer = None
elif extrapolate:
# Current extrapolate strategy is very simple: presume we're going at full speed (45mph).
SPEED = 45. / 3600 # in miles per second
delta_t = (time - result.timestamp).total_seconds()
delta_odo = delta_t * SPEED
odometer = result.odometer + delta_odo
else:
odometer = result.odometer
results = database.query(conn, """
SELECT timestamp, clock, timeofday
FROM bus_data
WHERE clock IS NOT NULL
AND channel = %(channel)s
AND timestamp > %(start)s
AND timestamp <= %(end)s
ORDER BY timestamp DESC
LIMIT 1
""", channel=channel, start=start, end=end)
result = results.fetchone()
if result is None:
clock24h = None
timeofday = None
clock_face = None
else:
clock12h = result.clock
# HACK: assume null means dawn, as we can reliably detect everything else.
timeofday = result.timeofday or "dawn"
clock24h = clock12h
if time_is_pm(conn, result.timestamp, clock12h, timeofday):
clock24h += 720
if extrapolate:
delta_t = (time - result.timestamp).total_seconds()
clock12h += delta_t
clock24h += delta_t
clock12h %= 720
clock24h %= 1440
clock_face = "{}:{:02d}".format(clock12h // 60, int(clock12h % 60))
return {
"odometer": odometer,
"clock_minutes": clock24h,
"clock": clock_face,
"timeofday": timeofday,
}
values = bus_stats.get_latest(channel, conn)
if type not in values:
return f"Unknown type: {type!r}", 400
return to_json(values[type])
def time_is_pm(conn, timestamp, clock, timeofday):

Loading…
Cancel
Save