Merge pull request #119 from ekimekim/mike/database-resilience

Changes to improve behaviour if the DB is down
pull/128/head
Mike Lang 5 years ago committed by GitHub
commit 7183b25ce9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -188,9 +188,8 @@ class BackfillerManager(object):
self.start = start
self.run_once = run_once
self.node_file = node_file
self.node_database = node_database
if self.node_database is not None:
self.db_manager = database.DBManager(dsn=self.node_database)
self.db_manager = None if node_database is None else database.DBManager(dsn=node_database)
self.connection = None
self.localhost = localhost
self.download_concurrency = download_concurrency
self.recent_cutoff = recent_cutoff
@ -227,8 +226,6 @@ class BackfillerManager(object):
get_nodes are stopped. If self.run_once, only call nodes once. Calling
stop will exit the loop."""
self.logger.info('Starting')
if self.node_database is not None:
self.connection = self.db_manager.get_conn()
failures = 0
while not self.stopping.is_set():
@ -237,17 +234,12 @@ class BackfillerManager(object):
except Exception:
# To ensure a fresh slate and clear any DB-related errors, get a new conn on error.
# This is heavy-handed but simple and effective.
if self.node_database is not None:
self.connection = self.db_manager.get_conn()
self.connection = None
if failures < MAX_BACKOFF:
failures += 1
delay = common.jitter(TIMEOUT * 2**failures)
self.logger.exception('Getting nodes failed. Retrying in {:.0f} s'.format(delay))
try:
host = [s.split('=')[-1] for s in self.connection.dsn.split() if 'host' in s][0]
except Exception:
host = ''
node_list_errors.labels(filename=self.node_file, database=host).inc()
node_list_errors.labels(filename=self.node_file).inc()
self.stopping.wait(delay)
continue
exisiting_nodes = set(self.workers.keys())
@ -298,7 +290,9 @@ class BackfillerManager(object):
else:
nodes[substrs[0]] = substrs[1]
if self.node_database is not None:
if self.db_manager is not None:
if self.connection is None:
self.connection = self.db_manager.get_conn()
host = [s.split('=')[-1] for s in self.connection.dsn.split() if 'host' in s][0]
self.logger.info('Fetching list of nodes from {}'.format(host))
results = database.query(self.connection, """

@ -19,8 +19,7 @@ class DBManager(object):
returning them.
It has the ability to serve as a primitive connection pool, as getting a
new conn will return existing conns it knows about first, but this mainly
just exists to re-use the initial conn used to test the connection, and you
new conn will return existing conns it knows about first, but you
should use a real conn pool for any non-trivial use.
Returned conns are set to seralizable isolation level, autocommit, and use
@ -30,9 +29,6 @@ class DBManager(object):
self.conns = []
self.connect_timeout = connect_timeout
self.connect_kwargs = connect_kwargs
# get a connection to test whether connection is working.
conn = self.get_conn()
self.put_conn(conn)
def put_conn(self, conn):
self.conns.append(conn)

@ -565,13 +565,20 @@ def main(
# We want to error if either errors, and shut down if either exits.
dbmanager = None
stopping = gevent.event.Event()
while dbmanager is None:
try:
dbmanager = DBManager(dsn=dbconnect)
while True:
try:
# Get a test connection so we know the database is up,
# this produces a clearer error in cases where there's a connection problem.
conn = dbmanager.get_conn()
except Exception:
delay = common.jitter(10)
logging.info('Cannot connect to database. Retrying in {:.0f} s'.format(delay))
stop.wait(delay)
else:
# put it back so it gets reused on next get_conn()
dbmanager.put_conn(conn)
break
with open(creds_file) as f:
credentials = json.load(f)

@ -130,6 +130,8 @@ class SheetSync(object):
sync_errors.inc()
# To ensure a fresh slate and clear any DB-related errors, get a new conn on error.
# This is heavy-handed but simple and effective.
# If we can't re-connect, the program will crash from here,
# then restart and wait until it can connect again.
self.conn = self.dbmanager.get_conn()
self.wait(self.ERROR_RETRY_INTERVAL)
else:
@ -293,14 +295,20 @@ def main(dbconnect, sheets_creds_file, edit_url, bustime_start, sheet_id, worksh
logging.info("Starting up")
dbmanager = None
while dbmanager is None:
try:
dbmanager = DBManager(dsn=dbconnect)
while True:
try:
# Get a test connection so we know the database is up,
# this produces a clearer error in cases where there's a connection problem.
conn = dbmanager.get_conn()
except Exception:
delay = common.jitter(10)
logging.info('Cannot connect to database. Retrying in {:.0f} s'.format(delay))
stop.wait(delay)
else:
# put it back so it gets reused on next get_conn()
dbmanager.put_conn(conn)
break
sheets_creds = json.load(open(sheets_creds_file))

@ -347,14 +347,7 @@ def main(connection_string, default_channel, bustime_start, host='0.0.0.0', port
sys.exit()
gevent.signal(signal.SIGTERM, stop)
app.db_manager = None
while app.db_manager is None and not stopping.is_set():
try:
app.db_manager = database.DBManager(dsn=connection_string)
except Exception:
delay = common.jitter(10)
logging.info('Cannot connect to database. Retrying in {:.0f} s'.format(delay))
stopping.wait(delay)
common.PromLogCountsHandler.install()
common.install_stacksampler()

Loading…
Cancel
Save