Improve WSGIServer graceful shutdown handling

Previously both restreamer and thrimshim had some complex logic for dealing with
graceful shutdown, in different ways, that was still prone to race conditions.

We replace this with a common method that does it properly.

Fixes #226
pull/244/head
Mike Lang 3 years ago committed by Mike Lang
parent 6e3b2e767e
commit 7649a4e840

@ -2,8 +2,12 @@
"""A place for common utilities between wubloader components""" """A place for common utilities between wubloader components"""
import datetime import datetime
import errno import errno
import logging
import os import os
import random import random
from signal import SIGTERM
import gevent.event
from .segments import get_best_segments, rough_cut_segments, fast_cut_segments, full_cut_segments, parse_segment_path, SegmentInfo from .segments import get_best_segments, rough_cut_segments, fast_cut_segments, full_cut_segments, parse_segment_path, SegmentInfo
from .stats import timed, PromLogCountsHandler, install_stacksampler from .stats import timed, PromLogCountsHandler, install_stacksampler
@ -122,3 +126,27 @@ def writeall(write, value):
raise Exception("Wrote 0 chars while calling {} with {}-char {}".format(write, len(value), type(value).__name__)) raise Exception("Wrote 0 chars while calling {} with {}-char {}".format(write, len(value), type(value).__name__))
# remove the first n chars and go again if we have anything left # remove the first n chars and go again if we have anything left
value = value[n:] value = value[n:]
def serve_with_graceful_shutdown(server, stop_timeout=20):
"""Takes a gevent.WSGIServer and serves forever until SIGTERM is received,
or the server errors. This is slightly tricky to do due to race conditions
between server.stop() and server.start().
In particular if start() is called after stop(), then the server will not be stopped.
To be safe, we must set up our own flag indicating we should stop, and ensure that
start() has fully completed before we call stop().
"""
stopping = gevent.event.Event()
def stop():
logging.debug("Stop flag set")
stopping.set()
gevent.signal_handler(SIGTERM, stop)
logging.info("Starting up")
server.start()
logging.debug("Started")
stopping.wait()
logging.info("Shutting down")
server.stop(stop_timeout)
logging.info("Gracefully shut down")

@ -5,7 +5,6 @@ import functools
import json import json
import logging import logging
import os import os
import signal
import subprocess import subprocess
from uuid import uuid4 from uuid import uuid4
@ -15,7 +14,7 @@ import prometheus_client as prom
from flask import Flask, url_for, request, abort, Response from flask import Flask, url_for, request, abort, Response
from gevent.pywsgi import WSGIServer from gevent.pywsgi import WSGIServer
from common import dateutil, get_best_segments, rough_cut_segments, fast_cut_segments, full_cut_segments, PromLogCountsHandler, install_stacksampler from common import dateutil, get_best_segments, rough_cut_segments, fast_cut_segments, full_cut_segments, PromLogCountsHandler, install_stacksampler, serve_with_graceful_shutdown
from common.flask_stats import request_stats, after_request from common.flask_stats import request_stats, after_request
from common.segments import feed_input, render_segments_waveform from common.segments import feed_input, render_segments_waveform
@ -432,17 +431,10 @@ def main(host='0.0.0.0', port=8000, base_dir='.', backdoor_port=0):
app.static_folder = base_dir app.static_folder = base_dir
server = WSGIServer((host, port), cors(app)) server = WSGIServer((host, port), cors(app))
def stop():
logging.info("Shutting down")
server.stop()
gevent.signal_handler(signal.SIGTERM, stop)
PromLogCountsHandler.install() PromLogCountsHandler.install()
install_stacksampler() install_stacksampler()
if backdoor_port: if backdoor_port:
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
logging.info("Starting up") serve_with_graceful_shutdown(server)
server.serve_forever()
logging.info("Gracefully shut down")

@ -420,19 +420,6 @@ def main(
app.title_header = "" if title_header is None else "{} - ".format(title_header) app.title_header = "" if title_header is None else "{} - ".format(title_header)
app.description_footer = "" if description_footer is None else "\n\n{}".format(description_footer) app.description_footer = "" if description_footer is None else "\n\n{}".format(description_footer)
app.upload_locations = upload_locations.split(',') if upload_locations else [] app.upload_locations = upload_locations.split(',') if upload_locations else []
stopping = gevent.event.Event()
def stop():
logging.info("Shutting down")
stopping.set()
# handle when the server is running
if hasattr(server, 'socket'):
server.stop()
# and when not
else:
sys.exit()
gevent.signal_handler(signal.SIGTERM, stop)
app.db_manager = database.DBManager(dsn=connection_string) app.db_manager = database.DBManager(dsn=connection_string)
common.PromLogCountsHandler.install() common.PromLogCountsHandler.install()
@ -441,8 +428,7 @@ def main(
if backdoor_port: if backdoor_port:
gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start()
logging.info('Starting up')
if app.no_authentication: if app.no_authentication:
logging.warning('Not authenticating POST requests') logging.warning('Not authenticating POST requests')
server.serve_forever()
logging.info("Gracefully shut down") common.serve_with_graceful_shutdown(server)

Loading…
Cancel
Save