pull/483/merge
Christopher Usher 2 weeks ago committed by GitHub
commit 54ebfd5be6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,16 @@
import gevent.monkey
gevent.monkey.patch_all()
import logging
import os
import argh
from thrimshim.main import main
LOG_FORMAT = "[%(asctime)s] %(levelname)8s %(name)s(%(module)s:%(lineno)d): %(message)s"
level = os.environ.get('WUBLOADER_LOG_LEVEL', 'INFO').upper()
logging.basicConfig(level=level, format=LOG_FORMAT)
argh.dispatch_command(main)

@ -0,0 +1,312 @@
import datetime
import itertools
import json
import math
import operator
import argh
import base64
import flask
import gevent
import gevent.backdoor
from gevent.pywsgi import WSGIServer
import common
from common import database
from common.flask_stats import request_stats, after_request
app = flask.Flask('bus_synthesizer')
app.after_request(after_request)
MAX_SPEED = 45 / 3600
def cors(app):
"""WSGI middleware that sets CORS headers"""
HEADERS = [
("Access-Control-Allow-Credentials", "false"),
("Access-Control-Allow-Headers", "*"),
("Access-Control-Allow-Methods", "GET,POST,HEAD"),
("Access-Control-Allow-Origin", "*"),
("Access-Control-Max-Age", "86400"),
]
def handle(environ, start_response):
def _start_response(status, headers, exc_info=None):
headers += HEADERS
return start_response(status, headers, exc_info)
return app(environ, _start_response)
return handle
def post_process_miles(seconds, miles, days):
good = []
suspect = []
for i in range(1, len(seconds) - 1):
if math.isnan(miles[i]) or miles[i] <= 100:
suspect.append(i)
continue
if days[i] is None or days[i] == 'score':
suspect.append(i)
continue
previous_diff = miles[i] - miles[i - 1]
if previous_diff < 0 or previous_diff > MAX_SPEED * (seconds[i] - seconds[i - 1]):
suspect.append(i)
continue
next_diff = miles[i + 1] - miles[i]
if next_diff < 0 or next_diff > MAX_SPEED * (seconds[i + 1] - seconds[i]):
suspect.append(i)
continue
# handle big jumps to apparently good data
if good and miles[i] - miles[good[-1]] > MAX_SPEED * (seconds[i] - seconds[good[-1]]):
suspect.append(i)
continue
# try to filter out bad data at the start
if not good and miles[i] > 1000:
suspect.append(i)
continue
good.append(i)
# if there are no 'good' odometer readings, bail on post processing
if len(good) == 0:
return [math.nan for i in range(len(miles))]
corrected_miles = [miles[i] if i in good else 0. for i in range(len(miles))]
# identify groups of suspicious data and correct them
for k, g in itertools.groupby(enumerate(suspect), lambda x:x[0]-x[1]):
group = map(operator.itemgetter(1), g)
group = list(map(int, group))
to_fix = []
for i in group:
back = 1
# check whether any suspicious data is likely valid and mark it as not suspicious
while True:
if corrected_miles[i - back]:
diff = miles[i] - corrected_miles[i - back]
max_diff = MAX_SPEED * (seconds[i] - seconds[i - back])
forward_diff = miles[group[-1] + 1] - miles[i]
forward_max_diff = MAX_SPEED * (seconds[group[-1] + 1] - seconds[i])
if diff >= 0 and diff <= max_diff and forward_diff <= forward_max_diff:
corrected_miles[i] = miles[i]
break
else:
back += 1
if not corrected_miles[i]:
to_fix.append(i)
# actually fix remaining suspicious data via linear interpolation
for k, g in itertools.groupby(enumerate(to_fix), lambda x:x[0]-x[1]):
subgroup = map(operator.itemgetter(1), g)
subgroup = list(map(int, subgroup))
# ignore data from before the first good measurement or after crashes
if subgroup[0] < good[0] or corrected_miles[subgroup[0] - 1] > corrected_miles[subgroup[-1] + 1]:
continue
m = (corrected_miles[subgroup[-1] + 1] - corrected_miles[subgroup[0] - 1]) / (seconds[subgroup[-1] + 1] - seconds[subgroup[0] - 1])
b = corrected_miles[subgroup[-1] + 1] - m * seconds[subgroup[-1] + 1]
for i in subgroup:
corrected_miles[i] = m * seconds[i] + b
# custom handling of the start
if 0 <= corrected_miles[1] - miles[0] <= MAX_SPEED * (seconds[1] - seconds[0]):
corrected_miles[0] = miles[0]
# custom handling of the end
# find the most recent good value
for latest in range(len(seconds) - 1, -1, -1):
if corrected_miles[latest]:
break
to_fix = []
for i in range(latest + 1, len(seconds)):
back = 1
while True:
if corrected_miles[i - back]:
diff = miles[i] - corrected_miles[i - back]
max_diff = MAX_SPEED * (seconds[i] - seconds[i - back])
if diff >= 0 and diff <= max_diff:
corrected_miles[i] = miles[i]
break
else:
back += 1
if not corrected_miles[i]:
to_fix.append(i)
# linear interpolation of the end
for k, g in itertools.groupby(enumerate(to_fix), lambda x:x[0]-x[1]):
subgroup = map(operator.itemgetter(1), g)
subgroup = list(map(int, subgroup))
# ignore the last data point or after crashes
if subgroup[-1] == (len(corrected_miles) - 1) or corrected_miles[subgroup[0] - 1] > corrected_miles[subgroup[-1] + 1]:
continue
m = (corrected_miles[subgroup[-1] + 1] - corrected_miles[subgroup[0] - 1]) / (seconds[subgroup[-1] + 1] - seconds[subgroup[0] - 1])
for i in subgroup:
corrected_miles[i] = m * seconds[i] + b
corrected_miles = [mile if mile > 0 else math.nan for mile in corrected_miles]
return corrected_miles
def post_process_clocks(seconds, clocks, days):
good = []
for i in range(1, len(seconds) - 2):
if math.isnan(clocks[i]) or clocks[i] < 60 or clocks[i] > 780:
continue
if days[i] is None or days[i] == 'score':
continue
# handle big jumps to apparently good data
if good and (seconds[i] - seconds[good[-1]] < 120):
if clocks[i] - clocks[good[-1]] > math.ceil((seconds[i] - seconds[good[-1]]) / 60):
continue
if clocks[i] - clocks[good[-1]] < math.floor((seconds[i] - seconds[good[-1]]) / 60):
continue
if (clocks[i] - clocks[i - 1]) in [0, 1] and (clocks[i + 1] - clocks[i]) in [0, 1]:
good.append(i)
corrected_clocks = [clocks[i] if i in good else 0. for i in range(len(clocks))]
for i in range(len(seconds)):
if corrected_clocks[i]:
continue
if days[i] is None or days[i] == 'score':
continue
for j in range(i):
if 59.5 <= (seconds[i] - seconds[j]) <= 60.5:
if corrected_clocks[j]:
corrected_clocks[i] = corrected_clocks[j] + 1
break
return corrected_clocks
@app.route('/bus_synthesizer/latest')
@request_stats
def latest():
ago_30_min = datetime.datetime.utcnow() - datetime.timedelta(minutes=30)
query = common.database.query(app.db_manager.get_conn(), """
SELECT timestamp, odometer, clock, timeofday
FROM bus_data
WHERE timestamp > %(start)s
--AND NOT segment LIKE '%%partial%%'
ORDER BY timestamp;
""", start=ago_30_min)
rows = query.fetchall()
times, miles, clocks, days = zip(*rows)
seconds = [(time - times[0]) / datetime.timedelta(seconds=1) for time in times]
miles = [math.nan if mile is None else mile for mile in miles]
clocks = [math.nan if clock is None else clock for clock in clocks]
corrected_miles = post_process_miles(seconds, miles, days)
corrected_clocks = post_process_clocks(seconds, clocks, days)
raw = times[-1], miles[-1], days[-1], clocks[-1]
latest = None
second_latest = None
for i in range(len(times) - 1, -1, -1):
if not math.isnan(corrected_miles[i]):
if latest is None:
latest = {'time':times[i],
'second':seconds[i],
'mile':corrected_miles[i],
'day':days[i],
'clock':corrected_clocks[i]}
elif second_latest is None:
second_latest = {'time':times[i],
'second':seconds[i],
'mile':corrected_miles[i],
'day':days[i],
'clock':corrected_clocks[i]}
else:
break
if latest is not None and latest['clock']:
if latest['day'] == 'day':
# before 7:30 is pm
is_pm = latest['clock'] < 7 * 60 + 30
elif latest['day'] == 'dusk':
is_pm = True
elif latest['day'] == 'night':
# after 8:00 is pm
is_pm = latest['clock'] >= 8 * 60
else: # dawn - game does not go back to day
# before 6:40 is pm
if latest['clock'] < 6 * 60 + 40:
is_pm = True
elif latest['clock'] >= 7 * 60:
is_pm = True
else:
# 6:40 to 7:00 is ambiguous; look back 21 min
twenty_one = None
for i in range(len(times)):
if (21 * 60 - 0.5) <= (latest['second'] - seconds[i]) <= (21 * 60 + 0.5):
twenty_one = i
break
if twenty_one is not None and days[twenty_one] == 'night':
is_pm = False
else:
is_pm = True
hour = latest['clock'] // 60
minute = latest['clock'] % 60
if is_pm:
hour += 12
proccessed_clock = '{}:{:02d}'.format(hour, minute)
else:
proccessed_clock = None
if latest is not None:
processed = latest['time'], latest['mile'], latest['day'], proccessed_clock
else:
processed = (None, None, None, None)
if second_latest is not None:
m = (latest['mile'] - second_latest['mile']) / (latest['second'] - second_latest['second'])
b = latest['mile'] - m * latest['second']
now = datetime.datetime.utcnow()
now_second = (now - times[0]) / datetime.timedelta(seconds=1)
if latest['clock']:
diff = int(math.floor((now - latest['time']) / datetime.timedelta(minutes = 1)))
new_clock = hour * 60 + minute + diff
minute = new_clock % 60
hour = (new_clock // 60) % 24
predicted_clock = '{}:{:02d}'.format(hour, minute)
else:
predicted_clock = None
predicted = now, m * now_second + b, days[-1], predicted_clock
else:
predicted = None, None, None, None
output = {'raw':tuple_to_dict(raw),
'post_processed':tuple_to_dict(processed),
'predicted':tuple_to_dict(predicted),
}
return to_json(output)
def tuple_to_dict(t, names=['time', 'mile', 'ToD', 'clock']):
return {names[i]:t[i] for i in range(len(t))}
# copied from thrimshim
def to_json(obj):
def convert(value):
if isinstance(value, datetime.datetime):
return value.isoformat()
if isinstance(value, datetime.timedelta):
return value.total_seconds()
if isinstance(value, memoryview) or isinstance(value, bytes):
return base64.b64encode(bytes(value)).decode()
raise TypeError(f"Can't convert object of type {value.__class__.__name__} to JSON: {value}")
return json.dumps(obj, default=convert)
def main(connection_string, host='0.0.0.0', port=8004, backdoor_port=0):
server = WSGIServer((host, port), cors(app))
app.db_manager = database.DBManager(dsn=connection_string)
common.PromLogCountsHandler.install()
common.install_stacksampler()
if backdoor_port:
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
common.serve_with_graceful_shutdown(server)
Loading…
Cancel
Save