backfiller database code

pull/58/head
Christopher Usher 5 years ago
parent 3ccace2a73
commit 4b9fbcb7d2

@ -16,8 +16,8 @@ import prometheus_client as prom
import requests import requests
import common import common
import common.dateutil from common import dateutil
from common import database
segments_backfilled = prom.Counter( segments_backfilled = prom.Counter(
'segments_backfilled', 'segments_backfilled',
@ -188,6 +188,9 @@ class BackfillerManager(object):
get_nodes are stopped. If self.run_once, only call nodes once. Calling get_nodes are stopped. If self.run_once, only call nodes once. Calling
stop will exit the loop.""" stop will exit the loop."""
self.logger.info('Starting') self.logger.info('Starting')
if self.node_database is not None:
self.db_manager = database.DBManager(dsn=self.node_database)
failures = 0 failures = 0
while not self.stopping.is_set(): while not self.stopping.is_set():
@ -220,20 +223,19 @@ class BackfillerManager(object):
def get_nodes(self): def get_nodes(self):
"""List address of other wubloaders. """List address of other wubloaders.
This returns a list of the other wubloaders as URL strings. Node URLs This returns a list of the other wubloaders as URL strings. Node URLs
are taken from three places. First, the --static-nodes command line are taken from three places. First, the --static-nodes command line
argument can be used to provide a list of addresses that are always argument can be used to provide a list of URLs that are always
backfilled from. Node names infered from the hostnames of the URLs. backfilled from. Node names infered from the hostnames of the URLs.
Second, addresses are read from the file named in the --node-file Second, nodes are read from the file named in the --node-file command
command line argument. In this file, nodes are listed one per line as line argument. In this file, nodes are listed one per line as name-URL
node name node address pairs or as just node addresses. Lines starting pairs or as just node URLs. Lines starting with '#' are ignored. If
with '#' are ignored. If only the address is provided, the node name is only the URL is provided, the node name is taken from the hostname.
taken from the hostname. Third, node names and addresses can be Third, node names and URLs can be requested from the database given by
requested from the database with the database address. If multiple --node-database. If multiple nodes URLs with the same name are found,
nodes address with the same name are found, only the last is retained only the last is retained and any nodes matching the local hostname
and any nodes matching the local host name (given by the --hostname) (given by the --hostname argument) are ignored to try to prevent this
argument are ignored to try to prevent this node from backfilling from node from backfilling from its self."""
its self."""
nodes = {urlparse.urlparse(node).hostname:node for node in self.static_nodes} nodes = {urlparse.urlparse(node).hostname:node for node in self.static_nodes}
@ -250,8 +252,15 @@ class BackfillerManager(object):
nodes[substrs[0]] = substrs[1] nodes[substrs[0]] = substrs[1]
if self.node_database is not None: if self.node_database is not None:
self.logger.info('Fetching list of nodes from {}'.format(self.node_database)) self.logger.info('Fetching list of nodes from {}'.format(
# TODO query the database urlparse.urlparse(self.node_database).hostname))
conn = self.db_manager.query()
results = database.query(conn, """
SELECT name, url, backfill_from
FROM nodes""")
for row in results:
if row.backfill_from:
nodes[row.name] = row.url
nodes.pop(self.localhost, None) nodes.pop(self.localhost, None)
return nodes.values() return nodes.values()
@ -368,7 +377,7 @@ class BackfillerWorker(object):
@argh.arg('--start', help='If a datetime only backfill hours after that datetime. If a number, bacfill hours more recent than that number of hours ago. If None (default), all hours are backfilled.') @argh.arg('--start', help='If a datetime only backfill hours after that datetime. If a number, bacfill hours more recent than that number of hours ago. If None (default), all hours are backfilled.')
@argh.arg('--run-once', help='If True, backfill only once. By default False.') @argh.arg('--run-once', help='If True, backfill only once. By default False.')
@argh.arg('--node-file', help="Name of file listing nodes to backfill from. One node per line in the form NAME URI with whitespace only lines or lines starting with '#' ignored. If None (default) do not get nodes from a file.") @argh.arg('--node-file', help="Name of file listing nodes to backfill from. One node per line in the form NAME URI with whitespace only lines or lines starting with '#' ignored. If None (default) do not get nodes from a file.")
@argh.arg('--node-database', help='Address of database node to fetch a list of nodes from. If None (default) do not get nodes from database.') @argh.arg('--node-database', help='Postgres conection string for database to fetch a list of nodes from. Either a space-separated list of key=value pairs, or a URI like: postgresql://USER:PASSWORD@HOST/DBNAME?KEY=VALUE . If None (default) do not get nodes from database.')
@argh.arg('--localhost', help='Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname()') @argh.arg('--localhost', help='Name of local machine. Used to prevent backfilling from itself. By default the result of socket.gethostname()')
def main(streams, base_dir='.', variants='source', metrics_port=8002, def main(streams, base_dir='.', variants='source', metrics_port=8002,
static_nodes='', backdoor_port=0, start=None, run_once=False, static_nodes='', backdoor_port=0, start=None, run_once=False,
@ -385,7 +394,7 @@ def main(streams, base_dir='.', variants='source', metrics_port=8002,
start = float(start) start = float(start)
logging.info('Backfilling last {} hours'.format(start)) logging.info('Backfilling last {} hours'.format(start))
except ValueError: except ValueError:
start = common.dateutil.parse(start) start = dateutil.parse(start)
logging.info('Backfilling since {}'.format(start)) logging.info('Backfilling since {}'.format(start))
common.PromLogCountsHandler.install() common.PromLogCountsHandler.install()

@ -59,6 +59,12 @@ CREATE TABLE IF NOT EXISTS events (
-- Index on state, since that's almost always what we're querying on besides id -- Index on state, since that's almost always what we're querying on besides id
CREATE INDEX IF NOT EXISTS event_state ON events (state); CREATE INDEX IF NOT EXISTS event_state ON events (state);
CREATE TABLE IF NOT EXISTS nodes (
name TEXT PRIMARY KEY,
url TEXT NOT NULL,
backfill_from BOOLEAN NOT NULL DEFAULT TRUE
);
""" """

Loading…
Cancel
Save