@ -3,6 +3,7 @@ import json
import logging
import logging
import signal
import signal
from collections import defaultdict
from collections import defaultdict
from urllib . parse import urlparse
import argh
import argh
import gevent . backdoor
import gevent . backdoor
@ -15,6 +16,7 @@ from requests import HTTPError
import common
import common
import common . dateutil
import common . dateutil
from common . database import DBManager , query , get_column_placeholder
from common . database import DBManager , query , get_column_placeholder
from common . media import check_for_media , download_media
from common . sheets import Sheets as SheetsClient
from common . sheets import Sheets as SheetsClient
from . sheets import SheetsEventsMiddleware , SheetsPlaylistsMiddleware , SheetsArchiveMiddleware
from . sheets import SheetsEventsMiddleware , SheetsPlaylistsMiddleware , SheetsArchiveMiddleware
@ -274,6 +276,11 @@ class EventsSync(SheetSync):
" category " ,
" category " ,
}
}
def __init__ ( self , name , middleware , stop , dbmanager , reverse_sync = False , media_dir = None ) :
super ( ) . __init__ ( name , middleware , stop , dbmanager , reverse_sync )
self . media_dir = media_dir
self . media_downloads = None if media_dir is None else { }
def observe_rows ( self , rows ) :
def observe_rows ( self , rows ) :
counts = defaultdict ( lambda : 0 )
counts = defaultdict ( lambda : 0 )
for row in rows :
for row in rows :
@ -287,6 +294,25 @@ class EventsSync(SheetSync):
def sync_row ( self , sheet_row , db_row ) :
def sync_row ( self , sheet_row , db_row ) :
# Do some special-case transforms for events before syncing
# Do some special-case transforms for events before syncing
# Attempt to download any URLs in the links column if we don't already have them.
# This is done asyncronously. We keep a record of failed attempts for two reasons:
# - To avoid retrying
# - To populate the errors column asyncronously
# This record is just in memory - we're ok retrying after every restart.
# You can disable downloads on a per-row basis by putting "[nodownload]" in the notes column.
if sheet_row is not None and self . media_dir is not None and " [nodownload] " not in sheet_row [ " notes " ] :
for url in sheet_row [ ' image_links ' ] :
if url not in self . media_downloads :
self . media_downloads [ url ] = gevent . spawn ( self . download_media , url )
# Greenlet.exception is populated if the greenlet failed with an exception,
# or None otherwise (success or not finished).
# We treat a failure to fetch a URL like a parse error.
e = self . media_downloads [ url ] . exception
if e is not None :
sheet_row . setdefault ( " _parse_errors " , [ ] ) . append (
f " Failed to download media link { url : !r } : { e } "
)
if db_row is not None :
if db_row is not None :
# If no database error, but we have parse errors, indicate they should be displayed.
# If no database error, but we have parse errors, indicate they should be displayed.
if db_row . error is None and sheet_row is not None and sheet_row . get ( ' _parse_errors ' ) :
if db_row . error is None and sheet_row is not None and sheet_row . get ( ' _parse_errors ' ) :
@ -300,6 +326,20 @@ class EventsSync(SheetSync):
super ( ) . sync_row ( sheet_row , db_row )
super ( ) . sync_row ( sheet_row , db_row )
def download_media ( self , url ) :
hostname = urlparse ( url ) . hostname
if hostname in ( " youtu.be " , " youtube.com " ) :
self . logger . info ( f " Ignoring url { url : !r } : Blocklisted hostname " )
if check_for_media ( self . media_dir , url ) :
self . logger . info ( f " Already have content for url { url : !r } " )
return
try :
download_media ( url , self . media_dir )
except Exception :
self . logger . warning ( f " Failed to download url { url : !r } " , exc_info = True )
raise
self . logger . info ( f " Downloaded media for url { url : !r } " )
class ArchiveSync ( EventsSync ) :
class ArchiveSync ( EventsSync ) :
# Archive events are a special case of event with less input columns.
# Archive events are a special case of event with less input columns.
@ -372,7 +412,7 @@ class PlaylistsSync(SheetSync):
event_id : The id of the streamlog event to sync
event_id : The id of the streamlog event to sync
""" ,
""" ,
)
)
def main ( dbconnect , sync_configs , metrics_port = 8005 , backdoor_port = 0 ):
def main ( dbconnect , sync_configs , metrics_port = 8005 , backdoor_port = 0 , media_dir = " . " ):
"""
"""
Sheet sync constantly scans a Google Sheets sheet and a database , copying inputs from the sheet
Sheet sync constantly scans a Google Sheets sheet and a database , copying inputs from the sheet
to the DB and outputs from the DB to the sheet .
to the DB and outputs from the DB to the sheet .
@ -466,7 +506,10 @@ def main(dbconnect, sync_configs, metrics_port=8005, backdoor_port=0):
" playlists " : PlaylistsSync ,
" playlists " : PlaylistsSync ,
" archive " : ArchiveSync ,
" archive " : ArchiveSync ,
} [ config [ " type " ] ]
} [ config [ " type " ] ]
sync = sync_class ( config [ " name " ] , middleware , stop , dbmanager , reverse_sync )
sync_class_kwargs = { }
if config [ " type " ] == " events " and config . get ( " download_media " , False ) :
sync_class_kwargs [ " media_dir " ] = media_dir
sync = sync_class ( config [ " name " ] , middleware , stop , dbmanager , reverse_sync , * * sync_class_kwargs )
workers . append ( sync )
workers . append ( sync )
jobs = [ gevent . spawn ( worker . run ) for worker in workers ]
jobs = [ gevent . spawn ( worker . run ) for worker in workers ]