chat_archiver: Add prometheus metrics

pull/300/head
Mike Lang 2 years ago committed by Mike Lang
parent c25d79e7c2
commit c1c1c11bce

@ -9,6 +9,7 @@ import signal
import socket import socket
import time import time
from calendar import timegm from calendar import timegm
from collections import defaultdict
from datetime import datetime from datetime import datetime
from itertools import count from itertools import count
from uuid import uuid4 from uuid import uuid4
@ -17,9 +18,11 @@ import gevent.event
import gevent.queue import gevent.queue
from common import ensure_directory from common import ensure_directory
from common.stats import timed
from girc import Client from girc import Client
from monotonic import monotonic from monotonic import monotonic
import prometheus_client as prom
# How long each batch is # How long each batch is
BATCH_INTERVAL = 60 BATCH_INTERVAL = 60
@ -49,6 +52,62 @@ MAX_SERVER_LAG = 30
# by up to this amount before and after our best guess # by up to this amount before and after our best guess
ESTIMATED_TIME_PADDING = 5 ESTIMATED_TIME_PADDING = 5
messages_received = prom.Counter(
"messages_received"
"Number of chat messages recieved by the client. 'client' tag is per client instance.",
["channel", "client", "command"],
)
messages_ignored = prom.Counter(
"messages_ignored",
"Number of chat messages that were recieved but ignored due to not being on the allowlist.",
["channel", "client", "command"],
)
messages_written = prom.Counter(
"messages_written",
"Number of chat messages recieved and then written out to disk in a batch.",
["channel", "client", "command"],
)
batch_messages = prom.Histogram(
"batch_messages",
"Number of messages in batches written to disk",
["channel", "client"],
buckets=[0, 1, 4, 16, 64, 256, 1024],
)
# based on DB2021, an average PRIVMSG is about 600 bytes.
# so since batch_messages goes up to 1024, batch_bytes should go up to ~ 600KB.
# let's just call it 1MB.
batch_bytes = prom.Histogram(
"batch_bytes",
"Size in bytes of batches written to disk",
["channel", "client"],
buckets=[0, 256, 1024, 4096, 16384, 65536, 262144, 1048576]
)
open_batches = prom.Gauge(
"open_batches",
"Number of batches that have at least one pending message not yet written to disk",
["channel", "client"],
)
server_lag = prom.Gauge(
"server_lag",
"Estimated time difference between server-side timestamps and local time, based on latest message",
["channel", "client"],
)
merge_pass_duration = prom.Histogram(
"merge_pass_duration",
"How long it took to run through all batches and merge any duplicates",
)
merge_pass_merges = prom.Histogram(
"merge_pass_merges",
"How many merges (times for which more than one batch existed) were done in a single merge pass",
buckets=[0, 1, 10, 100, 1000, 10000],
)
class Archiver(object): class Archiver(object):
def __init__(self, name, base_dir, channel, nick, oauth_token): def __init__(self, name, base_dir, channel, nick, oauth_token):
@ -70,6 +129,21 @@ class Archiver(object):
) )
self.client.channel('#{}'.format(channel)).join() self.client.channel('#{}'.format(channel)).join()
def write_batch(self, batch_time, messages):
# wrapper around general write_batch() function
write_batch(
self.path, batch_time, messages,
size_histogram=batch_bytes.labels(channel=self.channel, client=id(self)),
)
batch_messages.labels(channel=self.channel, client=id(self)).observe(len(messages))
# incrementing a prom counter can be stupidly expensive, collect up per-command values
# so we can do them in one go
by_command = defaultdict(lambda: 0)
for message in messages:
by_command[message["command"]] += 1
for command, count in by_command.items():
messages_written.labels(channel=self.channel, client=id(self), command=command).inc(count)
def run(self): def run(self):
# wait for twitch to send the initial ROOMSTATE for the room we've joined. # wait for twitch to send the initial ROOMSTATE for the room we've joined.
# everything preceeding this message is per-connection stuff we don't care about. # everything preceeding this message is per-connection stuff we don't care about.
@ -95,6 +169,7 @@ class Archiver(object):
last_timestamped_message = None last_timestamped_message = None
# {batch time: [messages]} # {batch time: [messages]}
batches = {} batches = {}
open_batches.labels(channel=self.channel, client=id(self)).set_function(lambda: len(batches))
while not (self.stopping.is_set() and self.messages.empty()): while not (self.stopping.is_set() and self.messages.empty()):
# wait until we either have a message, are stopping, or a batch can be closed # wait until we either have a message, are stopping, or a batch can be closed
@ -112,7 +187,7 @@ class Archiver(object):
for batch_time, messages in list(batches.items()): for batch_time, messages in list(batches.items()):
if now >= batch_time + BATCH_INTERVAL + MAX_SERVER_LAG: if now >= batch_time + BATCH_INTERVAL + MAX_SERVER_LAG:
del batches[batch_time] del batches[batch_time]
write_batch(self.path, batch_time, messages) self.write_batch(batch_time, messages)
# consume a message if any # consume a message if any
try: try:
@ -122,6 +197,7 @@ class Archiver(object):
if message.command not in COMMANDS: if message.command not in COMMANDS:
self.logger.info("Skipping non-whitelisted command: {}".format(message.command)) self.logger.info("Skipping non-whitelisted command: {}".format(message.command))
messages_ignored.labels(channel=self.channel, client=id(self), command=message.command).inc()
continue continue
self.logger.debug("Got message: {}".format(message)) self.logger.debug("Got message: {}".format(message))
@ -131,12 +207,14 @@ class Archiver(object):
} }
data['receivers'] = {self.name: message.received_at} data['receivers'] = {self.name: message.received_at}
self.logger.debug("Got message: {}".format(data)) self.logger.debug("Got message: {}".format(data))
messages_received.labels(channel=self.channel, client=id(self), command=message.command).inc()
if data['tags'] and 'tmi-sent-ts' in data['tags']: if data['tags'] and 'tmi-sent-ts' in data['tags']:
# explicit server time is available # explicit server time is available
timestamp = int(data['tags']['tmi-sent-ts']) / 1000. # original is int ms timestamp = int(data['tags']['tmi-sent-ts']) / 1000. # original is int ms
last_timestamped_message = message last_timestamped_message = message
last_server_time = timestamp last_server_time = timestamp
server_lag.labels(channel=self.channel, client=id(self)).set(time.time() - timestamp)
time_range = 0 time_range = 0
self.logger.debug("Message has exact timestamp: {}".format(timestamp)) self.logger.debug("Message has exact timestamp: {}".format(timestamp))
# check for any non-timestamped messages which we now know must have been # check for any non-timestamped messages which we now know must have been
@ -178,7 +256,7 @@ class Archiver(object):
# Close any remaining batches # Close any remaining batches
for batch_time, messages in batches.items(): for batch_time, messages in batches.items():
write_batch(self.path, batch_time, messages) self.write_batch(batch_time, messages)
self.client.wait_for_stop() # re-raise any errors self.client.wait_for_stop() # re-raise any errors
self.logger.info("Client stopped") self.logger.info("Client stopped")
@ -187,9 +265,11 @@ class Archiver(object):
self.client.stop() self.client.stop()
def write_batch(path, batch_time, messages): def write_batch(path, batch_time, messages, size_histogram=None):
"""Batches are named PATH/YYYY-MM-DDTHH/MM:SS-HASH.json""" """Batches are named PATH/YYYY-MM-DDTHH/MM:SS-HASH.json"""
output = (format_batch(messages) + '\n').encode('utf-8') output = (format_batch(messages) + '\n').encode('utf-8')
if size_histogram is not None:
size_histogram.observe(len(output))
hash = base64.b64encode(hashlib.sha256(output).digest(), b"-_").decode().rstrip("=") hash = base64.b64encode(hashlib.sha256(output).digest(), b"-_").decode().rstrip("=")
hour = datetime.utcfromtimestamp(batch_time).strftime("%Y-%m-%dT%H") hour = datetime.utcfromtimestamp(batch_time).strftime("%Y-%m-%dT%H")
time = datetime.utcfromtimestamp(batch_time).strftime("%M:%S") time = datetime.utcfromtimestamp(batch_time).strftime("%M:%S")
@ -251,6 +331,7 @@ def merge_all(path, interval=None, stopping=None):
stopping = gevent.event.Event() stopping = gevent.event.Event()
while not stopping.is_set(): while not stopping.is_set():
start = monotonic() start = monotonic()
merges_done = 0
# loop until no changes # loop until no changes
while True: while True:
logging.debug("Scanning for merges") logging.debug("Scanning for merges")
@ -270,11 +351,14 @@ def merge_all(path, interval=None, stopping=None):
logging.info("Merging {} batches at time {}".format(count, timestamp)) logging.info("Merging {} batches at time {}".format(count, timestamp))
batch_time = timegm(time.strptime(timestamp, "%Y-%m-%dT%H:%M:%S")) batch_time = timegm(time.strptime(timestamp, "%Y-%m-%dT%H:%M:%S"))
merge_batch_files(path, batch_time) merge_batch_files(path, batch_time)
merges_done += 1
duration = monotonic() - start
merge_pass_duration.observe(duration)
merge_pass_merges.observe(merges_done)
if interval is None: if interval is None:
# one-shot # one-shot
break break
next_run = start + interval remaining = interval - duration
remaining = next_run - monotonic()
if remaining > 0: if remaining > 0:
logging.debug("Waiting {}s for next merge scan".format(remaining)) logging.debug("Waiting {}s for next merge scan".format(remaining))
stopping.wait(remaining) stopping.wait(remaining)
@ -325,6 +409,7 @@ def merge_batch_files(path, batch_time):
if batch_file not in written: if batch_file not in written:
os.remove(batch_file) os.remove(batch_file)
@timed("merge_messages", normalize=lambda _, left, right: len(left) + len(right))
def merge_messages(left, right): def merge_messages(left, right):
"""Merges two lists of messages into one merged list. """Merges two lists of messages into one merged list.
This operation should be a CRDT, ie. all the following hold: This operation should be a CRDT, ie. all the following hold:
@ -358,7 +443,7 @@ def merge_messages(left, right):
if k not in ("receivers", "time", "time_range") if k not in ("receivers", "time", "time_range")
): ):
receivers = a["receivers"] | b["receivers"] receivers = a["receivers"] | b["receivers"]
# Error checkdng - make sure no receiver timestamps are being overwritten. # Error checking - make sure no receiver timestamps are being overwritten.
# This would indicate we're merging two messages recieved at different times # This would indicate we're merging two messages recieved at different times
# by the same recipient. # by the same recipient.
for k in receivers.keys(): for k in receivers.keys():

Loading…
Cancel
Save