|
|
@ -113,7 +113,7 @@ merge_pass_merges = prom.Histogram(
|
|
|
|
|
|
|
|
|
|
|
|
class Archiver(object):
|
|
|
|
class Archiver(object):
|
|
|
|
def __init__(self, name, base_dir, channel, nick, oauth_token):
|
|
|
|
def __init__(self, name, base_dir, channel, nick, oauth_token):
|
|
|
|
self.logger = logging.getLogger(type(self).__name__).getChild(channel)
|
|
|
|
self.logger = logging.getLogger(type(self).__name__).getChild(name)
|
|
|
|
self.name = name
|
|
|
|
self.name = name
|
|
|
|
self.messages = gevent.queue.Queue()
|
|
|
|
self.messages = gevent.queue.Queue()
|
|
|
|
self.channel = channel
|
|
|
|
self.channel = channel
|
|
|
@ -137,16 +137,16 @@ class Archiver(object):
|
|
|
|
# wrapper around general write_batch() function
|
|
|
|
# wrapper around general write_batch() function
|
|
|
|
write_batch(
|
|
|
|
write_batch(
|
|
|
|
self.path, batch_time, messages,
|
|
|
|
self.path, batch_time, messages,
|
|
|
|
size_histogram=batch_bytes.labels(channel=self.channel, client=id(self)),
|
|
|
|
size_histogram=batch_bytes.labels(channel=self.channel, client=self.name),
|
|
|
|
)
|
|
|
|
)
|
|
|
|
batch_messages.labels(channel=self.channel, client=id(self)).observe(len(messages))
|
|
|
|
batch_messages.labels(channel=self.channel, client=self.name).observe(len(messages))
|
|
|
|
# incrementing a prom counter can be stupidly expensive, collect up per-command values
|
|
|
|
# incrementing a prom counter can be stupidly expensive, collect up per-command values
|
|
|
|
# so we can do them in one go
|
|
|
|
# so we can do them in one go
|
|
|
|
by_command = defaultdict(lambda: 0)
|
|
|
|
by_command = defaultdict(lambda: 0)
|
|
|
|
for message in messages:
|
|
|
|
for message in messages:
|
|
|
|
by_command[message["command"]] += 1
|
|
|
|
by_command[message["command"]] += 1
|
|
|
|
for command, count in by_command.items():
|
|
|
|
for command, count in by_command.items():
|
|
|
|
messages_written.labels(channel=self.channel, client=id(self), command=command).inc(count)
|
|
|
|
messages_written.labels(channel=self.channel, client=self.name, command=command).inc(count)
|
|
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
def run(self):
|
|
|
|
# wait for twitch to send the initial ROOMSTATE for the room we've joined.
|
|
|
|
# wait for twitch to send the initial ROOMSTATE for the room we've joined.
|
|
|
@ -199,19 +199,20 @@ class Archiver(object):
|
|
|
|
except gevent.queue.Empty:
|
|
|
|
except gevent.queue.Empty:
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.logger.debug("Got message: {}".format(message))
|
|
|
|
|
|
|
|
|
|
|
|
if message.command not in COMMANDS:
|
|
|
|
if message.command not in COMMANDS:
|
|
|
|
self.logger.info("Skipping non-whitelisted command: {}".format(message.command))
|
|
|
|
self.logger.info("Skipping non-whitelisted command: {}".format(message.command))
|
|
|
|
messages_ignored.labels(channel=self.channel, client=id(self), command=message.command).inc()
|
|
|
|
messages_ignored.labels(channel=self.channel, client=self.name, command=message.command).inc()
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
self.logger.debug("Got message: {}".format(message))
|
|
|
|
|
|
|
|
data = {
|
|
|
|
data = {
|
|
|
|
attr: getattr(message, attr)
|
|
|
|
attr: getattr(message, attr)
|
|
|
|
for attr in ('command', 'params', 'sender', 'user', 'host', 'tags')
|
|
|
|
for attr in ('command', 'params', 'sender', 'user', 'host', 'tags')
|
|
|
|
}
|
|
|
|
}
|
|
|
|
data['receivers'] = {self.name: message.received_at}
|
|
|
|
data['receivers'] = {self.name: message.received_at}
|
|
|
|
self.logger.debug("Got message: {}".format(data))
|
|
|
|
self.logger.debug("Got message data: {}".format(data))
|
|
|
|
messages_received.labels(channel=self.channel, client=id(self), command=message.command).inc()
|
|
|
|
messages_received.labels(channel=self.channel, client=self.name, command=message.command).inc()
|
|
|
|
|
|
|
|
|
|
|
|
if data['tags'] and data['tags'].get('emotes', '') != '':
|
|
|
|
if data['tags'] and data['tags'].get('emotes', '') != '':
|
|
|
|
emote_specs = data['tags']['emotes'].split('/')
|
|
|
|
emote_specs = data['tags']['emotes'].split('/')
|
|
|
@ -223,7 +224,7 @@ class Archiver(object):
|
|
|
|
timestamp = int(data['tags']['tmi-sent-ts']) / 1000. # original is int ms
|
|
|
|
timestamp = int(data['tags']['tmi-sent-ts']) / 1000. # original is int ms
|
|
|
|
last_timestamped_message = message
|
|
|
|
last_timestamped_message = message
|
|
|
|
last_server_time = timestamp
|
|
|
|
last_server_time = timestamp
|
|
|
|
server_lag.labels(channel=self.channel, client=id(self)).set(time.time() - timestamp)
|
|
|
|
server_lag.labels(channel=self.channel, client=self.name).set(time.time() - timestamp)
|
|
|
|
time_range = 0
|
|
|
|
time_range = 0
|
|
|
|
self.logger.debug("Message has exact timestamp: {}".format(timestamp))
|
|
|
|
self.logger.debug("Message has exact timestamp: {}".format(timestamp))
|
|
|
|
# check for any non-timestamped messages which we now know must have been
|
|
|
|
# check for any non-timestamped messages which we now know must have been
|
|
|
|