From 9b73d5cc8510097b79a9e4108c946cf027d71f11 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Sun, 31 Oct 2021 13:02:59 +1100 Subject: [PATCH] Improve WSGIServer graceful shutdown handling Previously both restreamer and thrimshim had some complex logic for dealing with graceful shutdown, in different ways, that was still prone to race conditions. We replace this with a common method that does it properly. Fixes #226 --- common/common/__init__.py | 28 ++++++++++++++++++++++++++++ restreamer/restreamer/main.py | 12 ++---------- thrimshim/thrimshim/main.py | 18 ++---------------- 3 files changed, 32 insertions(+), 26 deletions(-) diff --git a/common/common/__init__.py b/common/common/__init__.py index 299db19..f906d54 100644 --- a/common/common/__init__.py +++ b/common/common/__init__.py @@ -2,8 +2,12 @@ """A place for common utilities between wubloader components""" import datetime import errno +import logging import os import random +from signal import SIGTERM + +import gevent.event from .segments import get_best_segments, rough_cut_segments, fast_cut_segments, full_cut_segments, parse_segment_path, SegmentInfo from .stats import timed, PromLogCountsHandler, install_stacksampler @@ -122,3 +126,27 @@ def writeall(write, value): raise Exception("Wrote 0 chars while calling {} with {}-char {}".format(write, len(value), type(value).__name__)) # remove the first n chars and go again if we have anything left value = value[n:] + + +def serve_with_graceful_shutdown(server, stop_timeout=20): + """Takes a gevent.WSGIServer and serves forever until SIGTERM is received, + or the server errors. This is slightly tricky to do due to race conditions + between server.stop() and server.start(). + In particular if start() is called after stop(), then the server will not be stopped. + To be safe, we must set up our own flag indicating we should stop, and ensure that + start() has fully completed before we call stop(). + """ + stopping = gevent.event.Event() + def stop(): + logging.debug("Stop flag set") + stopping.set() + gevent.signal_handler(SIGTERM, stop) + + logging.info("Starting up") + server.start() + logging.debug("Started") + + stopping.wait() + logging.info("Shutting down") + server.stop(stop_timeout) + logging.info("Gracefully shut down") diff --git a/restreamer/restreamer/main.py b/restreamer/restreamer/main.py index ffef50f..a9ba4c3 100644 --- a/restreamer/restreamer/main.py +++ b/restreamer/restreamer/main.py @@ -5,7 +5,6 @@ import functools import json import logging import os -import signal import subprocess from uuid import uuid4 @@ -15,7 +14,7 @@ import prometheus_client as prom from flask import Flask, url_for, request, abort, Response from gevent.pywsgi import WSGIServer -from common import dateutil, get_best_segments, rough_cut_segments, fast_cut_segments, full_cut_segments, PromLogCountsHandler, install_stacksampler +from common import dateutil, get_best_segments, rough_cut_segments, fast_cut_segments, full_cut_segments, PromLogCountsHandler, install_stacksampler, serve_with_graceful_shutdown from common.flask_stats import request_stats, after_request from common.segments import feed_input, render_segments_waveform @@ -432,17 +431,10 @@ def main(host='0.0.0.0', port=8000, base_dir='.', backdoor_port=0): app.static_folder = base_dir server = WSGIServer((host, port), cors(app)) - def stop(): - logging.info("Shutting down") - server.stop() - gevent.signal_handler(signal.SIGTERM, stop) - PromLogCountsHandler.install() install_stacksampler() if backdoor_port: gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() - logging.info("Starting up") - server.serve_forever() - logging.info("Gracefully shut down") + serve_with_graceful_shutdown(server) diff --git a/thrimshim/thrimshim/main.py b/thrimshim/thrimshim/main.py index 36790ff..d2ccfa2 100644 --- a/thrimshim/thrimshim/main.py +++ b/thrimshim/thrimshim/main.py @@ -420,19 +420,6 @@ def main( app.title_header = "" if title_header is None else "{} - ".format(title_header) app.description_footer = "" if description_footer is None else "\n\n{}".format(description_footer) app.upload_locations = upload_locations.split(',') if upload_locations else [] - - stopping = gevent.event.Event() - def stop(): - logging.info("Shutting down") - stopping.set() - # handle when the server is running - if hasattr(server, 'socket'): - server.stop() - # and when not - else: - sys.exit() - gevent.signal_handler(signal.SIGTERM, stop) - app.db_manager = database.DBManager(dsn=connection_string) common.PromLogCountsHandler.install() @@ -441,8 +428,7 @@ def main( if backdoor_port: gevent.backdoor.BackdoorServer(('127.0.0.1', backdoor_port), locals=locals()).start() - logging.info('Starting up') if app.no_authentication: logging.warning('Not authenticating POST requests') - server.serve_forever() - logging.info("Gracefully shut down") + + common.serve_with_graceful_shutdown(server)