From 742dacd846f53b527943ad12b63c0ceeb044501e Mon Sep 17 00:00:00 2001 From: Christopher Usher Date: Tue, 12 Nov 2024 01:53:31 -0800 Subject: [PATCH 1/4] Initial commit of the bus_synthesizer --- bus_synthesizer/bus_synthesizer/__init__.py | 0 bus_synthesizer/bus_synthesizer/__main__.py | 16 ++ bus_synthesizer/bus_synthesizer/main.py | 193 ++++++++++++++++++++ 3 files changed, 209 insertions(+) create mode 100644 bus_synthesizer/bus_synthesizer/__init__.py create mode 100644 bus_synthesizer/bus_synthesizer/__main__.py create mode 100644 bus_synthesizer/bus_synthesizer/main.py diff --git a/bus_synthesizer/bus_synthesizer/__init__.py b/bus_synthesizer/bus_synthesizer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bus_synthesizer/bus_synthesizer/__main__.py b/bus_synthesizer/bus_synthesizer/__main__.py new file mode 100644 index 0000000..d0e78f7 --- /dev/null +++ b/bus_synthesizer/bus_synthesizer/__main__.py @@ -0,0 +1,16 @@ +import gevent.monkey +gevent.monkey.patch_all() + +import logging +import os + +import argh + +from thrimshim.main import main + +LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s" + +level = os.environ.get('WUBLOADER_LOG_LEVEL', 'INFO').upper() +logging.basicConfig(level=level, format=LOG_FORMAT) +argh.dispatch_command(main) + diff --git a/bus_synthesizer/bus_synthesizer/main.py b/bus_synthesizer/bus_synthesizer/main.py new file mode 100644 index 0000000..7d94778 --- /dev/null +++ b/bus_synthesizer/bus_synthesizer/main.py @@ -0,0 +1,193 @@ +import datetime +import itertools +import json +import math +import operator + +import argh +import base64 +import flask +import gevent +import gevent.backdoor +from gevent.pywsgi import WSGIServer + +import common +from common import database +from common.flask_stats import request_stats, after_request + +app = flask.Flask('bus_synthesizer') +app.after_request(after_request) + +MAX_SPEED = 45 / 3600 + +def cors(app): + """WSGI middleware that sets CORS headers""" + HEADERS = [ + ("Access-Control-Allow-Credentials", "false"), + ("Access-Control-Allow-Headers", "*"), + ("Access-Control-Allow-Methods", "GET,POST,HEAD"), + ("Access-Control-Allow-Origin", "*"), + ("Access-Control-Max-Age", "86400"), + ] + def handle(environ, start_response): + def _start_response(status, headers, exc_info=None): + headers += HEADERS + return start_response(status, headers, exc_info) + return app(environ, _start_response) + return handle + + +def post_process_miles(seconds, miles, days): + good = [] + suspect = [] + for i in range(1, len(seconds) - 1): + if math.isnan(miles[i]) or miles[i] <= 100: + suspect.append(i) + continue + if days[i] is None or days[i] == 'score': + suspect.append(i) + continue + previous_diff = miles[i] - miles[i - 1] + if previous_diff < 0 or previous_diff > MAX_SPEED * (seconds[i] - seconds[i - 1]): + suspect.append(i) + continue + next_diff = miles[i + 1] - miles[i] + if next_diff < 0 or next_diff > MAX_SPEED * (seconds[i + 1] - seconds[i]): + suspect.append(i) + continue + # handle big jumps to apparently good data + if good and miles[i] - miles[good[-1]] > MAX_SPEED * (seconds[i] - seconds[good[-1]]): + suspect.append(i) + continue + # try to filter out bad data at the start + if not good and miles[i] > 1000: + suspect.append(i) + continue + good.append(i) + + corrected_miles = [miles[i] if i in good else 0. for i in range(len(miles))] + # identify groups of suspicious data and correct them + for k, g in itertools.groupby(enumerate(suspect), lambda x:x[0]-x[1]): + group = map(operator.itemgetter(1), g) + group = list(map(int, group)) + to_fix = [] + for i in group: + back = 1 + # check whether any suspicious data is likely valid and mark it as not suspicious + while True: + if corrected_miles[i - back]: + diff = miles[i] - corrected_miles[i - back] + max_diff = MAX_SPEED * (seconds[i] - seconds[i - back]) + forward_diff = miles[group[-1] + 1] - miles[i] + forward_max_diff = MAX_SPEED * (seconds[group[-1] + 1] - seconds[i]) + if diff >= 0 and diff <= max_diff and forward_diff <= forward_max_diff: + corrected_miles[i] = miles[i] + break + else: + back += 1 + if not corrected_miles[i]: + to_fix.append(i) + + # actually fix remaining suspicious data via linear interpolation + for k, g in itertools.groupby(enumerate(to_fix), lambda x:x[0]-x[1]): + subgroup = map(operator.itemgetter(1), g) + subgroup = list(map(int, subgroup)) + # ignore data from before the first good measurement or after crashes + if subgroup[0] < good[0] or corrected_miles[subgroup[0] - 1] > corrected_miles[subgroup[-1] + 1]: + continue + m = (corrected_miles[subgroup[-1] + 1] - corrected_miles[subgroup[0] - 1]) / (seconds[subgroup[-1] + 1] - seconds[subgroup[0] - 1]) + b = corrected_miles[subgroup[-1] + 1] - m * seconds[subgroup[-1] + 1] + for i in subgroup: + corrected_miles[i] = m * seconds[i] + b + + # custom handling of the start and end + if 0 <= corrected_miles[1] - miles[0] <= MAX_SPEED * (seconds[1] - seconds[0]): + corrected_miles[0] = miles[0] + if 0 <= miles[-1] - corrected_miles[-2] <= MAX_SPEED * (seconds[-1] - seconds[-2]): + corrected_miles[-1] = miles[-1] + + corrected_miles = [mile if mile > 0 else math.nan for mile in corrected_miles] + return corrected_miles + +@app.route('/bus_synthesizer/latest') +@request_stats +def latest(): + ago_30_min = datetime.datetime.utcnow() - datetime.timedelta(minutes=30) + query = common.database.query(app.db_manager.get_conn(), """ + SELECT timestamp, odometer, timeofday + FROM bus_data + WHERE timestamp > %(start)s + --AND NOT segment LIKE '%%partial%%' + ORDER BY timestamp; + """, start=ago_30_min) + rows = query.fetchall() + times, miles, days = zip(*rows) + + seconds = [(time - times[0]) / datetime.timedelta(seconds=1) for time in times] + miles = [math.nan if mile is None else mile for mile in miles] + corrected_miles = post_process_miles(seconds, miles, days) + + raw = times[-1], miles[-1], days[-1] + + latest = None + second_latest = None + for i in range(len(times) - 1, -1, -1): + if not math.isnan(corrected_miles[i]): + if latest is None: + latest = times[i], seconds[i], corrected_miles[i], days[i] + elif second_latest is None: + second_latest = times[i], seconds[i], corrected_miles[i], days[i] + else: + break + + if latest is not None: + processed = latest[0], latest[2], latest[3] + else: + processed = (None, None, None) + + if second_latest is not None: + m = (latest[2] - second_latest[2]) / (latest[1] - second_latest[1]) + b = latest[2] - m * latest[1] + now = datetime.datetime.utcnow() + now_second = (now - times[0]) / datetime.timedelta(seconds=1) + predicted = now, m * now_second + b, days[-1] + else: + predicted = None, None, None + + output = {'raw':tuple_to_dict(raw), + 'post_processed':tuple_to_dict(processed), + 'predicted':tuple_to_dict(predicted), + } + return to_json(output) + + +def tuple_to_dict(t, names=['time', 'mile', 'ToD']): + return {names[i]:t[i] for i in range(len(t))} + + +# copied from thrimshim +def to_json(obj): + def convert(value): + if isinstance(value, datetime.datetime): + return value.isoformat() + if isinstance(value, datetime.timedelta): + return value.total_seconds() + if isinstance(value, memoryview) or isinstance(value, bytes): + return base64.b64encode(bytes(value)).decode() + raise TypeError(f"Can't convert object of type {value.__class__.__name__} to JSON: {value}") + return json.dumps(obj, default=convert) + + +def main(connection_string, host='0.0.0.0', port=8004, backdoor_port=0): + + server = WSGIServer((host, port), cors(app)) + + app.db_manager = database.DBManager(dsn=connection_string) + + common.PromLogCountsHandler.install() + common.install_stacksampler() + + if backdoor_port: + gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() + + common.serve_with_graceful_shutdown(server) From 0095a207f48b8a689ee55b0d92f12d82f6f1a910 Mon Sep 17 00:00:00 2001 From: Christopher Usher Date: Tue, 12 Nov 2024 12:58:04 -0800 Subject: [PATCH 2/4] better post processing of the most recent values --- bus_synthesizer/bus_synthesizer/main.py | 39 +++++++++++++++++++++---- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/bus_synthesizer/bus_synthesizer/main.py b/bus_synthesizer/bus_synthesizer/main.py index 7d94778..3e64d8e 100644 --- a/bus_synthesizer/bus_synthesizer/main.py +++ b/bus_synthesizer/bus_synthesizer/main.py @@ -96,15 +96,44 @@ def post_process_miles(seconds, miles, days): if subgroup[0] < good[0] or corrected_miles[subgroup[0] - 1] > corrected_miles[subgroup[-1] + 1]: continue m = (corrected_miles[subgroup[-1] + 1] - corrected_miles[subgroup[0] - 1]) / (seconds[subgroup[-1] + 1] - seconds[subgroup[0] - 1]) - b = corrected_miles[subgroup[-1] + 1] - m * seconds[subgroup[-1] + 1] + b = corrected_miles[subgroup[-1] + 1] - m * seconds[subgroup[-1] + 1] for i in subgroup: corrected_miles[i] = m * seconds[i] + b - # custom handling of the start and end + # custom handling of the start if 0 <= corrected_miles[1] - miles[0] <= MAX_SPEED * (seconds[1] - seconds[0]): corrected_miles[0] = miles[0] - if 0 <= miles[-1] - corrected_miles[-2] <= MAX_SPEED * (seconds[-1] - seconds[-2]): - corrected_miles[-1] = miles[-1] + + # custom handling of the end + # find the most recent good value + for latest in range(len(seconds) - 1, -1, -1): + if corrected_miles[latest]: + break + to_fix = [] + for i in range(latest + 1, len(seconds)): + back = 1 + while True: + if corrected_miles[i - back]: + diff = miles[i] - corrected_miles[i - back] + max_diff = MAX_SPEED * (seconds[i] - seconds[i - back]) + if diff >= 0 and diff <= max_diff: + corrected_miles[i] = miles[i] + break + else: + back += 1 + if not corrected_miles[i]: + to_fix.append(i) + + # linear interpolation of the end + for k, g in itertools.groupby(enumerate(to_fix), lambda x:x[0]-x[1]): + subgroup = map(operator.itemgetter(1), g) + subgroup = list(map(int, subgroup)) + # ignore the last data point or after crashes + if subgroup[-1] == (len(corrected_miles) - 1) or corrected_miles[subgroup[0] - 1] > corrected_miles[subgroup[-1] + 1]: + continue + m = (corrected_miles[subgroup[-1] + 1] - corrected_miles[subgroup[0] - 1]) / (seconds[subgroup[-1] + 1] - seconds[subgroup[0] - 1]) + for i in subgroup: + corrected_miles[i] = m * seconds[i] + b corrected_miles = [mile if mile > 0 else math.nan for mile in corrected_miles] return corrected_miles @@ -157,7 +186,7 @@ def latest(): output = {'raw':tuple_to_dict(raw), 'post_processed':tuple_to_dict(processed), 'predicted':tuple_to_dict(predicted), - } + } return to_json(output) From f1eda037aba9a02bd8ff29441a3033d18081c75d Mon Sep 17 00:00:00 2001 From: Christopher Usher Date: Tue, 12 Nov 2024 20:53:50 -0800 Subject: [PATCH 3/4] Now post processes and returns the clock --- bus_synthesizer/bus_synthesizer/main.py | 118 ++++++++++++++++++++---- 1 file changed, 102 insertions(+), 16 deletions(-) diff --git a/bus_synthesizer/bus_synthesizer/main.py b/bus_synthesizer/bus_synthesizer/main.py index 3e64d8e..0002594 100644 --- a/bus_synthesizer/bus_synthesizer/main.py +++ b/bus_synthesizer/bus_synthesizer/main.py @@ -138,59 +138,145 @@ def post_process_miles(seconds, miles, days): corrected_miles = [mile if mile > 0 else math.nan for mile in corrected_miles] return corrected_miles + +def post_process_clocks(seconds, clocks, days): + good = [] + for i in range(1, len(seconds) - 2): + if math.isnan(clocks[i]) or clocks[i] < 60 or clocks[i] > 780: + continue + if days[i] is None or days[i] == 'score': + continue + # handle big jumps to apparently good data + if good and (seconds[i] - seconds[good[-1]] < 120): + if clocks[i] - clocks[good[-1]] > math.ceil((seconds[i] - seconds[good[-1]]) / 60): + continue + if clocks[i] - clocks[good[-1]] < math.floor((seconds[i] - seconds[good[-1]]) / 60): + continue + if (clocks[i] - clocks[i - 1]) in [0, 1] and (clocks[i + 1] - clocks[i]) in [0, 1]: + good.append(i) + + corrected_clocks = [clocks[i] if i in good else 0. for i in range(len(clocks))] + for i in range(len(seconds)): + if corrected_clocks[i]: + continue + if days[i] is None or days[i] == 'score': + continue + for j in range(i): + if 59.5 <= (seconds[i] - seconds[j]) <= 60.5: + if corrected_clocks[j]: + corrected_clocks[i] = corrected_clocks[j] + 1 + break + + return corrected_clocks + @app.route('/bus_synthesizer/latest') @request_stats def latest(): - ago_30_min = datetime.datetime.utcnow() - datetime.timedelta(minutes=30) + ago_30_min = datetime.datetime.utcnow() - datetime.timedelta(minutes=30) query = common.database.query(app.db_manager.get_conn(), """ - SELECT timestamp, odometer, timeofday + SELECT timestamp, odometer, clock, timeofday FROM bus_data WHERE timestamp > %(start)s --AND NOT segment LIKE '%%partial%%' ORDER BY timestamp; """, start=ago_30_min) rows = query.fetchall() - times, miles, days = zip(*rows) + times, miles, clocks, days = zip(*rows) seconds = [(time - times[0]) / datetime.timedelta(seconds=1) for time in times] miles = [math.nan if mile is None else mile for mile in miles] + clocks = [math.nan if clock is None else clock for clock in clocks] corrected_miles = post_process_miles(seconds, miles, days) + corrected_clocks = post_process_clocks(seconds, clocks, days) - raw = times[-1], miles[-1], days[-1] + raw = times[-1], miles[-1], days[-1], clocks[-1] latest = None second_latest = None for i in range(len(times) - 1, -1, -1): if not math.isnan(corrected_miles[i]): if latest is None: - latest = times[i], seconds[i], corrected_miles[i], days[i] + latest = {'time':times[i], + 'second':seconds[i], + 'mile':corrected_miles[i], + 'day':days[i], + 'clock':corrected_clocks[i]} elif second_latest is None: - second_latest = times[i], seconds[i], corrected_miles[i], days[i] + second_latest = {'time':times[i], + 'second':seconds[i], + 'mile':corrected_miles[i], + 'day':days[i], + 'clock':corrected_clocks[i]} else: break - - if latest is not None: - processed = latest[0], latest[2], latest[3] + + if latest is not None and latest['clock']: + if latest['day'] == 'day': + # before 7:30 is pm + is_pm = latest['clock'] < 7 * 60 + 30 + elif latest['day'] == 'dusk': + is_pm = True + elif latest['day'] == 'night': + # after 8:00 is pm + is_pm = latest['clock'] >= 8 * 60 + else: # dawn - game does not go back to day + # before 6:40 is pm + if latest['clock'] < 6 * 60 + 40: + is_pm = True + elif latest['clock'] >= 7 * 60: + is_pm = True + else: + # 6:40 to 7:00 is ambiguous; look back 21 min + twenty_one = None + for i in range(len(times)): + if (21 * 60 - 0.5) <= (latest['second'] - seconds[i]) <= (21 * 60 + 0.5): + twenty_one = i + break + if twenty_one is not None and days[twenty_one] == 'night': + is_pm = False + else: + is_pm = True + + hour = latest['clock'] // 60 + minute = latest['clock'] % 60 + if is_pm: + hour += 12 + proccessed_clock = '{}:{:02d}'.format(hour, minute) else: - processed = (None, None, None) + proccessed_clock = None + if latest is not None: + processed = latest['time'], latest['mile'], latest['day'], proccessed_clock + else: + processed = (None, None, None, None) + if second_latest is not None: - m = (latest[2] - second_latest[2]) / (latest[1] - second_latest[1]) - b = latest[2] - m * latest[1] + m = (latest['mile'] - second_latest['mile']) / (latest['second'] - second_latest['second']) + b = latest['mile'] - m * latest['second'] now = datetime.datetime.utcnow() now_second = (now - times[0]) / datetime.timedelta(seconds=1) - predicted = now, m * now_second + b, days[-1] + if latest['clock']: + diff = int(math.floor((now - latest['time']) / datetime.timedelta(minutes = 1))) + new_clock = hour * 60 + minute + diff + minute = new_clock % 60 + hour = (new_clock // 60) % 24 + predicted_clock = '{}:{:02d}'.format(hour, minute) + else: + predicted_clock = None + + predicted = now, m * now_second + b, days[-1], predicted_clock else: - predicted = None, None, None - + predicted = None, None, None, None + output = {'raw':tuple_to_dict(raw), 'post_processed':tuple_to_dict(processed), 'predicted':tuple_to_dict(predicted), } + return to_json(output) -def tuple_to_dict(t, names=['time', 'mile', 'ToD']): +def tuple_to_dict(t, names=['time', 'mile', 'ToD', 'clock']): return {names[i]:t[i] for i in range(len(t))} From e2cf8c4bdeac133df86f834f56a35b34ba41a9d6 Mon Sep 17 00:00:00 2001 From: Christopher Usher Date: Wed, 13 Nov 2024 11:24:51 -0800 Subject: [PATCH 4/4] Fix to handle cases where there are no good odometer readings --- bus_synthesizer/bus_synthesizer/main.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bus_synthesizer/bus_synthesizer/main.py b/bus_synthesizer/bus_synthesizer/main.py index 0002594..ad723cf 100644 --- a/bus_synthesizer/bus_synthesizer/main.py +++ b/bus_synthesizer/bus_synthesizer/main.py @@ -65,6 +65,10 @@ def post_process_miles(seconds, miles, days): continue good.append(i) + # if there are no 'good' odometer readings, bail on post processing + if len(good) == 0: + return [math.nan for i in range(len(miles))] + corrected_miles = [miles[i] if i in good else 0. for i in range(len(miles))] # identify groups of suspicious data and correct them for k, g in itertools.groupby(enumerate(suspect), lambda x:x[0]-x[1]):