diff --git a/bus_synthesizer/bus_synthesizer/__init__.py b/bus_synthesizer/bus_synthesizer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bus_synthesizer/bus_synthesizer/__main__.py b/bus_synthesizer/bus_synthesizer/__main__.py new file mode 100644 index 0000000..d0e78f7 --- /dev/null +++ b/bus_synthesizer/bus_synthesizer/__main__.py @@ -0,0 +1,16 @@ +import gevent.monkey +gevent.monkey.patch_all() + +import logging +import os + +import argh + +from thrimshim.main import main + +LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s" + +level = os.environ.get('WUBLOADER_LOG_LEVEL', 'INFO').upper() +logging.basicConfig(level=level, format=LOG_FORMAT) +argh.dispatch_command(main) + diff --git a/bus_synthesizer/bus_synthesizer/main.py b/bus_synthesizer/bus_synthesizer/main.py new file mode 100644 index 0000000..ad723cf --- /dev/null +++ b/bus_synthesizer/bus_synthesizer/main.py @@ -0,0 +1,312 @@ +import datetime +import itertools +import json +import math +import operator + +import argh +import base64 +import flask +import gevent +import gevent.backdoor +from gevent.pywsgi import WSGIServer + +import common +from common import database +from common.flask_stats import request_stats, after_request + +app = flask.Flask('bus_synthesizer') +app.after_request(after_request) + +MAX_SPEED = 45 / 3600 + +def cors(app): + """WSGI middleware that sets CORS headers""" + HEADERS = [ + ("Access-Control-Allow-Credentials", "false"), + ("Access-Control-Allow-Headers", "*"), + ("Access-Control-Allow-Methods", "GET,POST,HEAD"), + ("Access-Control-Allow-Origin", "*"), + ("Access-Control-Max-Age", "86400"), + ] + def handle(environ, start_response): + def _start_response(status, headers, exc_info=None): + headers += HEADERS + return start_response(status, headers, exc_info) + return app(environ, _start_response) + return handle + + +def post_process_miles(seconds, miles, days): + good = [] + suspect = [] + for i in range(1, len(seconds) - 1): + if math.isnan(miles[i]) or miles[i] <= 100: + suspect.append(i) + continue + if days[i] is None or days[i] == 'score': + suspect.append(i) + continue + previous_diff = miles[i] - miles[i - 1] + if previous_diff < 0 or previous_diff > MAX_SPEED * (seconds[i] - seconds[i - 1]): + suspect.append(i) + continue + next_diff = miles[i + 1] - miles[i] + if next_diff < 0 or next_diff > MAX_SPEED * (seconds[i + 1] - seconds[i]): + suspect.append(i) + continue + # handle big jumps to apparently good data + if good and miles[i] - miles[good[-1]] > MAX_SPEED * (seconds[i] - seconds[good[-1]]): + suspect.append(i) + continue + # try to filter out bad data at the start + if not good and miles[i] > 1000: + suspect.append(i) + continue + good.append(i) + + # if there are no 'good' odometer readings, bail on post processing + if len(good) == 0: + return [math.nan for i in range(len(miles))] + + corrected_miles = [miles[i] if i in good else 0. for i in range(len(miles))] + # identify groups of suspicious data and correct them + for k, g in itertools.groupby(enumerate(suspect), lambda x:x[0]-x[1]): + group = map(operator.itemgetter(1), g) + group = list(map(int, group)) + to_fix = [] + for i in group: + back = 1 + # check whether any suspicious data is likely valid and mark it as not suspicious + while True: + if corrected_miles[i - back]: + diff = miles[i] - corrected_miles[i - back] + max_diff = MAX_SPEED * (seconds[i] - seconds[i - back]) + forward_diff = miles[group[-1] + 1] - miles[i] + forward_max_diff = MAX_SPEED * (seconds[group[-1] + 1] - seconds[i]) + if diff >= 0 and diff <= max_diff and forward_diff <= forward_max_diff: + corrected_miles[i] = miles[i] + break + else: + back += 1 + if not corrected_miles[i]: + to_fix.append(i) + + # actually fix remaining suspicious data via linear interpolation + for k, g in itertools.groupby(enumerate(to_fix), lambda x:x[0]-x[1]): + subgroup = map(operator.itemgetter(1), g) + subgroup = list(map(int, subgroup)) + # ignore data from before the first good measurement or after crashes + if subgroup[0] < good[0] or corrected_miles[subgroup[0] - 1] > corrected_miles[subgroup[-1] + 1]: + continue + m = (corrected_miles[subgroup[-1] + 1] - corrected_miles[subgroup[0] - 1]) / (seconds[subgroup[-1] + 1] - seconds[subgroup[0] - 1]) + b = corrected_miles[subgroup[-1] + 1] - m * seconds[subgroup[-1] + 1] + for i in subgroup: + corrected_miles[i] = m * seconds[i] + b + + # custom handling of the start + if 0 <= corrected_miles[1] - miles[0] <= MAX_SPEED * (seconds[1] - seconds[0]): + corrected_miles[0] = miles[0] + + # custom handling of the end + # find the most recent good value + for latest in range(len(seconds) - 1, -1, -1): + if corrected_miles[latest]: + break + to_fix = [] + for i in range(latest + 1, len(seconds)): + back = 1 + while True: + if corrected_miles[i - back]: + diff = miles[i] - corrected_miles[i - back] + max_diff = MAX_SPEED * (seconds[i] - seconds[i - back]) + if diff >= 0 and diff <= max_diff: + corrected_miles[i] = miles[i] + break + else: + back += 1 + if not corrected_miles[i]: + to_fix.append(i) + + # linear interpolation of the end + for k, g in itertools.groupby(enumerate(to_fix), lambda x:x[0]-x[1]): + subgroup = map(operator.itemgetter(1), g) + subgroup = list(map(int, subgroup)) + # ignore the last data point or after crashes + if subgroup[-1] == (len(corrected_miles) - 1) or corrected_miles[subgroup[0] - 1] > corrected_miles[subgroup[-1] + 1]: + continue + m = (corrected_miles[subgroup[-1] + 1] - corrected_miles[subgroup[0] - 1]) / (seconds[subgroup[-1] + 1] - seconds[subgroup[0] - 1]) + for i in subgroup: + corrected_miles[i] = m * seconds[i] + b + + corrected_miles = [mile if mile > 0 else math.nan for mile in corrected_miles] + return corrected_miles + + +def post_process_clocks(seconds, clocks, days): + good = [] + for i in range(1, len(seconds) - 2): + if math.isnan(clocks[i]) or clocks[i] < 60 or clocks[i] > 780: + continue + if days[i] is None or days[i] == 'score': + continue + # handle big jumps to apparently good data + if good and (seconds[i] - seconds[good[-1]] < 120): + if clocks[i] - clocks[good[-1]] > math.ceil((seconds[i] - seconds[good[-1]]) / 60): + continue + if clocks[i] - clocks[good[-1]] < math.floor((seconds[i] - seconds[good[-1]]) / 60): + continue + if (clocks[i] - clocks[i - 1]) in [0, 1] and (clocks[i + 1] - clocks[i]) in [0, 1]: + good.append(i) + + corrected_clocks = [clocks[i] if i in good else 0. for i in range(len(clocks))] + for i in range(len(seconds)): + if corrected_clocks[i]: + continue + if days[i] is None or days[i] == 'score': + continue + for j in range(i): + if 59.5 <= (seconds[i] - seconds[j]) <= 60.5: + if corrected_clocks[j]: + corrected_clocks[i] = corrected_clocks[j] + 1 + break + + return corrected_clocks + +@app.route('/bus_synthesizer/latest') +@request_stats +def latest(): + ago_30_min = datetime.datetime.utcnow() - datetime.timedelta(minutes=30) + query = common.database.query(app.db_manager.get_conn(), """ + SELECT timestamp, odometer, clock, timeofday + FROM bus_data + WHERE timestamp > %(start)s + --AND NOT segment LIKE '%%partial%%' + ORDER BY timestamp; + """, start=ago_30_min) + rows = query.fetchall() + times, miles, clocks, days = zip(*rows) + + seconds = [(time - times[0]) / datetime.timedelta(seconds=1) for time in times] + miles = [math.nan if mile is None else mile for mile in miles] + clocks = [math.nan if clock is None else clock for clock in clocks] + corrected_miles = post_process_miles(seconds, miles, days) + corrected_clocks = post_process_clocks(seconds, clocks, days) + + raw = times[-1], miles[-1], days[-1], clocks[-1] + + latest = None + second_latest = None + for i in range(len(times) - 1, -1, -1): + if not math.isnan(corrected_miles[i]): + if latest is None: + latest = {'time':times[i], + 'second':seconds[i], + 'mile':corrected_miles[i], + 'day':days[i], + 'clock':corrected_clocks[i]} + elif second_latest is None: + second_latest = {'time':times[i], + 'second':seconds[i], + 'mile':corrected_miles[i], + 'day':days[i], + 'clock':corrected_clocks[i]} + else: + break + + if latest is not None and latest['clock']: + if latest['day'] == 'day': + # before 7:30 is pm + is_pm = latest['clock'] < 7 * 60 + 30 + elif latest['day'] == 'dusk': + is_pm = True + elif latest['day'] == 'night': + # after 8:00 is pm + is_pm = latest['clock'] >= 8 * 60 + else: # dawn - game does not go back to day + # before 6:40 is pm + if latest['clock'] < 6 * 60 + 40: + is_pm = True + elif latest['clock'] >= 7 * 60: + is_pm = True + else: + # 6:40 to 7:00 is ambiguous; look back 21 min + twenty_one = None + for i in range(len(times)): + if (21 * 60 - 0.5) <= (latest['second'] - seconds[i]) <= (21 * 60 + 0.5): + twenty_one = i + break + if twenty_one is not None and days[twenty_one] == 'night': + is_pm = False + else: + is_pm = True + + hour = latest['clock'] // 60 + minute = latest['clock'] % 60 + if is_pm: + hour += 12 + proccessed_clock = '{}:{:02d}'.format(hour, minute) + else: + proccessed_clock = None + + if latest is not None: + processed = latest['time'], latest['mile'], latest['day'], proccessed_clock + else: + processed = (None, None, None, None) + + if second_latest is not None: + m = (latest['mile'] - second_latest['mile']) / (latest['second'] - second_latest['second']) + b = latest['mile'] - m * latest['second'] + now = datetime.datetime.utcnow() + now_second = (now - times[0]) / datetime.timedelta(seconds=1) + if latest['clock']: + diff = int(math.floor((now - latest['time']) / datetime.timedelta(minutes = 1))) + new_clock = hour * 60 + minute + diff + minute = new_clock % 60 + hour = (new_clock // 60) % 24 + predicted_clock = '{}:{:02d}'.format(hour, minute) + else: + predicted_clock = None + + predicted = now, m * now_second + b, days[-1], predicted_clock + else: + predicted = None, None, None, None + + output = {'raw':tuple_to_dict(raw), + 'post_processed':tuple_to_dict(processed), + 'predicted':tuple_to_dict(predicted), + } + + return to_json(output) + + +def tuple_to_dict(t, names=['time', 'mile', 'ToD', 'clock']): + return {names[i]:t[i] for i in range(len(t))} + + +# copied from thrimshim +def to_json(obj): + def convert(value): + if isinstance(value, datetime.datetime): + return value.isoformat() + if isinstance(value, datetime.timedelta): + return value.total_seconds() + if isinstance(value, memoryview) or isinstance(value, bytes): + return base64.b64encode(bytes(value)).decode() + raise TypeError(f"Can't convert object of type {value.__class__.__name__} to JSON: {value}") + return json.dumps(obj, default=convert) + + +def main(connection_string, host='0.0.0.0', port=8004, backdoor_port=0): + + server = WSGIServer((host, port), cors(app)) + + app.db_manager = database.DBManager(dsn=connection_string) + + common.PromLogCountsHandler.install() + common.install_stacksampler() + + if backdoor_port: + gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() + + common.serve_with_graceful_shutdown(server)