From 5cec6ec96ea313946227b3817d090fda17a294e6 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 27 Jun 2019 17:03:25 -0700 Subject: [PATCH] cutter: Reconnect after any error that might be a database error After certain kinds of DB error (eg. lost conn), we need to make a new conn to have things work again. To be safe, we just do it after every error where it might be a problem. --- cutter/cutter/main.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/cutter/cutter/main.py b/cutter/cutter/main.py index 260b47c..8fe62fd 100644 --- a/cutter/cutter/main.py +++ b/cutter/cutter/main.py @@ -59,7 +59,7 @@ class Cutter(object): ERROR_RETRY_INTERVAL = 5 RETRYABLE_UPLOAD_ERROR_WAIT_INTERVAL = 5 - def __init__(self, youtube, conn, stop, name, segments_path): + def __init__(self, youtube, dbmanager, stop, name, segments_path): """youtube is an authenticated and initialized youtube api client. Conn is a database connection. Stop is an Event triggering graceful shutdown when set. @@ -68,10 +68,11 @@ class Cutter(object): """ self.name = name self.youtube = youtube - self.conn = conn + self.dbmanager = dbmanager self.stop = stop self.segments_path = segments_path self.logger = logging.getLogger(type(self).__name__) + self.refresh_conn() def wait(self, interval): """Wait for INTERVAL with jitter, unless we're stopping""" @@ -90,6 +91,11 @@ class Cutter(object): continue self.cut_job(job) + def refresh_conn(self): + """After errors, we reconnect in case the error was connection-related.""" + self.logger.debug("Reconnecting to DB") + self.conn = self.dbmanager.get_conn() + def find_candidate(self): """List EDITED events and find one at random which we have all segments for (or for which allow_holes is true), returning a CutJob. @@ -100,6 +106,7 @@ class Cutter(object): candidates = self.list_candidates() except Exception: self.logger.exception("Error while listing candidates") + self.refresh_conn() self.wait(self.ERROR_RETRY_INTERVAL) continue if candidates: @@ -132,6 +139,7 @@ class Cutter(object): """, id=candidate.id, error='{}: Error while checking candidate: {}'.format(self.name, e)) except Exception: self.logger.exception("Failed to set error for candidate {}, ignoring".format(format_job(candidate))) + self.refresh_conn() else: if result.rowcount > 0: assert result.rowcount == 1 @@ -190,6 +198,7 @@ class Cutter(object): except Exception: # Rather than retry on failure here, just assume someone else claimed it in the meantime self.logger.exception("Error while claiming job {}, aborting claim".format(format_job(job))) + self.refresh_conn() self.wait(self.ERROR_RETRY_INTERVAL) raise CandidateGone if result.rowcount == 0: @@ -306,6 +315,8 @@ class Cutter(object): # we're aborting the cut, error handling has already happened return except Exception as ex: + self.refresh_conn() + # for HTTPErrors, getting http response body is also useful if isinstance(ex, requests.HTTPError): ex = "{}: {}".format(ex, ex.response.content) @@ -487,8 +498,8 @@ def main(dbconnect, youtube_creds_file, name=None, base_dir=".", metrics_port=80 client_secret=youtube_creds['client_secret'], refresh_token=youtube_creds['refresh_token'], ) - cutter = Cutter(youtube, dbmanager.get_conn(), stop, name, base_dir) - transcode_checker = TranscodeChecker(youtube, dbmanager.get_conn(), stop) + cutter = Cutter(youtube, dbmanager, stop, name, base_dir) + transcode_checker = TranscodeChecker(youtube, dbmanager, stop) jobs = [ gevent.spawn(cutter.run), gevent.spawn(transcode_checker.run),