|
|
@ -1,5 +1,4 @@
|
|
|
|
|
|
|
|
|
|
|
|
from calendar import timegm
|
|
|
|
|
|
|
|
import base64
|
|
|
|
import base64
|
|
|
|
import hashlib
|
|
|
|
import hashlib
|
|
|
|
import json
|
|
|
|
import json
|
|
|
@ -8,7 +7,9 @@ import os
|
|
|
|
import signal
|
|
|
|
import signal
|
|
|
|
import socket
|
|
|
|
import socket
|
|
|
|
import time
|
|
|
|
import time
|
|
|
|
|
|
|
|
from calendar import timegm
|
|
|
|
from datetime import datetime
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
from itertools import count
|
|
|
|
from uuid import uuid4
|
|
|
|
from uuid import uuid4
|
|
|
|
|
|
|
|
|
|
|
|
import gevent.event
|
|
|
|
import gevent.event
|
|
|
@ -55,6 +56,7 @@ class Archiver(object):
|
|
|
|
self.path = os.path.join(base_dir, channel)
|
|
|
|
self.path = os.path.join(base_dir, channel)
|
|
|
|
|
|
|
|
|
|
|
|
self.stopping = gevent.event.Event()
|
|
|
|
self.stopping = gevent.event.Event()
|
|
|
|
|
|
|
|
self.got_reconnect = gevent.event.Event()
|
|
|
|
self.client = Client(
|
|
|
|
self.client = Client(
|
|
|
|
hostname='irc.chat.twitch.tv',
|
|
|
|
hostname='irc.chat.twitch.tv',
|
|
|
|
port=6697,
|
|
|
|
port=6697,
|
|
|
@ -74,8 +76,17 @@ class Archiver(object):
|
|
|
|
@self.client.handler(command='ROOMSTATE', sync=True)
|
|
|
|
@self.client.handler(command='ROOMSTATE', sync=True)
|
|
|
|
def register_real_handler(client, message):
|
|
|
|
def register_real_handler(client, message):
|
|
|
|
self.client.handler(lambda c, m: self.messages.put(m), sync=True)
|
|
|
|
self.client.handler(lambda c, m: self.messages.put(m), sync=True)
|
|
|
|
|
|
|
|
self.logger.info("Client started")
|
|
|
|
return True
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Twitch sends a RECONNECT shortly before terminating the connection from the server side.
|
|
|
|
|
|
|
|
# This gives us time to start up a new instance of the archiver while keeping this one
|
|
|
|
|
|
|
|
# running, so that we can be sure we don't miss anything. This will cause duplicate batches,
|
|
|
|
|
|
|
|
# but those will get merged later.
|
|
|
|
|
|
|
|
@self.client.handler(command='RECONNECT')
|
|
|
|
|
|
|
|
def handle_reconnect(client, message):
|
|
|
|
|
|
|
|
self.got_reconnect.set()
|
|
|
|
|
|
|
|
|
|
|
|
self.client.start()
|
|
|
|
self.client.start()
|
|
|
|
|
|
|
|
|
|
|
|
last_server_time = None
|
|
|
|
last_server_time = None
|
|
|
@ -83,7 +94,7 @@ class Archiver(object):
|
|
|
|
# {batch time: [messages]}
|
|
|
|
# {batch time: [messages]}
|
|
|
|
batches = {}
|
|
|
|
batches = {}
|
|
|
|
|
|
|
|
|
|
|
|
while not self.stopping.is_set():
|
|
|
|
while not (self.stopping.is_set() and self.messages.empty()):
|
|
|
|
# wait until we either have a message, are stopping, or a batch can be closed
|
|
|
|
# wait until we either have a message, are stopping, or a batch can be closed
|
|
|
|
if batches:
|
|
|
|
if batches:
|
|
|
|
next_batch_close = min(batches.keys()) + BATCH_INTERVAL + MAX_SERVER_LAG
|
|
|
|
next_batch_close = min(batches.keys()) + BATCH_INTERVAL + MAX_SERVER_LAG
|
|
|
@ -168,6 +179,7 @@ class Archiver(object):
|
|
|
|
write_batch(self.path, batch_time, messages)
|
|
|
|
write_batch(self.path, batch_time, messages)
|
|
|
|
|
|
|
|
|
|
|
|
self.client.wait_for_stop() # re-raise any errors
|
|
|
|
self.client.wait_for_stop() # re-raise any errors
|
|
|
|
|
|
|
|
self.logger.info("Client stopped")
|
|
|
|
|
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
def stop(self):
|
|
|
|
self.client.stop()
|
|
|
|
self.client.stop()
|
|
|
@ -376,12 +388,30 @@ def merge_messages(left, right):
|
|
|
|
def main(channel, nick, oauth_token_path, base_dir='/mnt'):
|
|
|
|
def main(channel, nick, oauth_token_path, base_dir='/mnt'):
|
|
|
|
with open(oauth_token_path) as f:
|
|
|
|
with open(oauth_token_path) as f:
|
|
|
|
oauth_token = f.read()
|
|
|
|
oauth_token = f.read()
|
|
|
|
name = socket.gethostname()
|
|
|
|
# To ensure uniqueness even if multiple instances are running on the same host,
|
|
|
|
|
|
|
|
# also include our pid
|
|
|
|
|
|
|
|
name = "{}.{}".format(socket.gethostname(), os.getpid())
|
|
|
|
|
|
|
|
|
|
|
|
archiver = Archiver(name, base_dir, channel, nick, oauth_token)
|
|
|
|
stopping = gevent.event.Event()
|
|
|
|
|
|
|
|
gevent.signal_handler(signal.SIGTERM, stopping.set)
|
|
|
|
gevent.signal_handler(signal.SIGTERM, archiver.stop)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.info("Starting")
|
|
|
|
logging.info("Starting")
|
|
|
|
archiver.run()
|
|
|
|
for index in count():
|
|
|
|
|
|
|
|
# To ensure uniqueness between clients, include a client number
|
|
|
|
|
|
|
|
archiver = Archiver("{}.{}".format(name, index), base_dir, channel, nick, oauth_token)
|
|
|
|
|
|
|
|
worker = gevent.spawn(archiver.run)
|
|
|
|
|
|
|
|
# wait for either graceful exit, error, or for a signal from the archiver that a reconnect was requested
|
|
|
|
|
|
|
|
gevent.wait([stopping, worker, archiver.got_reconnect], count=1)
|
|
|
|
|
|
|
|
if stopping.is_set():
|
|
|
|
|
|
|
|
archiver.stop()
|
|
|
|
|
|
|
|
worker.get()
|
|
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
# if got reconnect, discard the old archiver (we don't care even if it fails after this)
|
|
|
|
|
|
|
|
# and make a new one
|
|
|
|
|
|
|
|
if archiver.got_reconnect.is_set():
|
|
|
|
|
|
|
|
logging.info("Got RECONNECT, creating new client while waiting for old one to finish")
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
# the only remaining case is that the worker failed, re-raise
|
|
|
|
|
|
|
|
worker.get()
|
|
|
|
|
|
|
|
assert False, "Not stopping, but worker exited successfully"
|
|
|
|
logging.info("Gracefully stopped")
|
|
|
|
logging.info("Gracefully stopped")
|
|
|
|