@ -1,12 +1,23 @@
import contextlib
import json
import math
import threading
import time
from . import get_suitable_downloader
from . common import FileDownloader
from . external import FFmpegFD
from . . downloader . hls import HlsFD
from . . networking import Request
from . . utils import DownloadError , str_or_none , try_get
from . . networking . exceptions import RequestError
from . . utils import (
DownloadError ,
RetryManager ,
str_or_none ,
traverse_obj ,
try_get ,
urljoin ,
)
class NiconicoDmcFD ( FileDownloader ) :
@ -56,34 +67,33 @@ class NiconicoDmcFD(FileDownloader):
return success
class NiconicoLive FD( FileDownloader ) :
""" Downloads niconico live without being stopped """
class NiconicoLive Base FD( FileDownloader ) :
_WEBSOCKET_RECONNECT_DELAY = 10
def real_download ( self , filename , info_dict ) :
video_id = info_dict [ ' video_id ' ]
ws_url = info_dict [ ' url ' ]
ws_extractor = info_dict [ ' ws ' ]
ws_origin_host = info_dict [ ' origin ' ]
live_quality = info_dict . get ( ' live_quality ' , ' high ' )
live_latency = info_dict . get ( ' live_latency ' , ' high ' )
dl = FFmpegFD ( self . ydl , self . params or { } )
new_info_dict = info_dict . copy ( )
new_info_dict . update ( {
' protocol ' : ' m3u8 ' ,
} )
@contextlib.contextmanager
def _ws_context ( self , info_dict ) :
""" Hold a WebSocket object and release it when leaving """
video_id = info_dict [ ' id ' ]
live_latency = info_dict [ ' live_latency ' ]
self . ws = info_dict [ ' __ws ' ]
self . m3u8_lock = threading . Event ( )
self . m3u8_url = info_dict [ ' manifest_url ' ]
self . m3u8_lock . set ( )
def communicate_ws ( reconnect ) :
if reconnect :
ws = self . ydl . urlopen ( Request ( ws_url , headers = { ' Origin ' : f ' https:// { ws_origin_host } ' } ) )
self . ws = self . ydl . urlopen ( Request (
self . ws . url , headers = { ' Origin ' : self . ws . wsw . request . headers [ ' Origin ' ] } ) )
if self . ydl . params . get ( ' verbose ' , False ) :
self . to_screen ( ' [debug] Sending startWatching request ' )
ws . send ( json . dumps ( {
self . ws . send ( json . dumps ( {
' type ' : ' startWatching ' ,
' data ' : {
' stream ' : {
' quality ' : live_quality ,
' protocol ' : ' hls +fmp4 ' ,
' quality ' : ' abr ' ,
' protocol ' : ' hls ' ,
' latency ' : live_latency ,
' chasePlay ' : False
} ,
@ -94,11 +104,9 @@ class NiconicoLiveFD(FileDownloader):
' reconnect ' : True ,
}
} ) )
else :
ws = ws_extractor
with ws :
with self . ws :
while True :
recv = ws . recv ( )
recv = self . ws . recv ( )
if not recv :
continue
data = json . loads ( recv )
@ -106,35 +114,155 @@ class NiconicoLiveFD(FileDownloader):
continue
if data . get ( ' type ' ) == ' ping ' :
# pong back
ws . send ( r ' { " type " : " pong " } ' )
ws . send ( r ' { " type " : " keepSeat " } ' )
self . ws . send ( r ' { " type " : " pong " } ' )
self . ws . send ( r ' { " type " : " keepSeat " } ' )
elif data . get ( ' type ' ) == ' stream ' :
self . m3u8_url = data [ ' data ' ] [ ' uri ' ]
self . m3u8_lock . set ( )
elif data . get ( ' type ' ) == ' disconnect ' :
self . write_debug ( data )
return True
return
elif data . get ( ' type ' ) == ' error ' :
self . write_debug ( data )
message = try_get ( data , lambda x : x [ ' body ' ] [ ' code ' ] , str ) or recv
r eturn DownloadError ( message )
r ais e DownloadError ( message )
elif self . ydl . params . get ( ' verbose ' , False ) :
if len ( recv ) > 100 :
recv = recv [ : 100 ] + ' ... '
self . to_screen ( ' [debug] Server said: %s ' % recv )
stopped = threading . Event ( )
def ws_main ( ) :
reconnect = False
while True :
while not stopped . is_set ( ) :
try :
ret = communicate_ws ( reconnect )
if ret is True :
return
except BaseException as e :
self . to_screen ( ' [ %s ] %s : Connection error occured, reconnecting after 10 seconds: %s ' % ( ' niconico:live ' , video_id , str_or_none ( e ) ) )
time . sleep ( 10 )
continue
finally :
communicate_ws ( reconnect )
break # Disconnected
except BaseException as e : # Including TransportError
if stopped . is_set ( ) :
break
self . m3u8_lock . clear ( ) # m3u8 url may be changed
self . to_screen ( ' [ %s ] %s : Connection error occured, reconnecting after %d seconds: %s ' % ( ' niconico:live ' , video_id , self . _WEBSOCKET_RECONNECT_DELAY , str_or_none ( e ) ) )
time . sleep ( self . _WEBSOCKET_RECONNECT_DELAY )
reconnect = True
self . m3u8_lock . set ( ) # Release possible locks
thread = threading . Thread ( target = ws_main , daemon = True )
thread . start ( )
return dl . download ( filename , new_info_dict )
try :
yield self
finally :
stopped . set ( )
self . ws . close ( )
thread . join ( )
def _master_m3u8_url ( self ) :
""" Get the refreshed manifest url after WebSocket reconnection to prevent HTTP 403 """
self . m3u8_lock . wait ( )
return self . m3u8_url
class NiconicoLiveFD ( NiconicoLiveBaseFD ) :
""" Downloads niconico live without being stopped """
def real_download ( self , filename , info_dict ) :
with self . _ws_context ( info_dict ) :
new_info_dict = info_dict . copy ( )
new_info_dict . update ( {
' protocol ' : ' m3u8 ' ,
} )
return FFmpegFD ( self . ydl , self . params or { } ) . download ( filename , new_info_dict )
class NiconicoLiveTimeshiftFD ( NiconicoLiveBaseFD , HlsFD ) :
""" Downloads niconico live timeshift VOD """
_PER_FRAGMENT_DOWNLOAD_RATIO = 0.1
def real_download ( self , filename , info_dict ) :
with self . _ws_context ( info_dict ) as ws_context :
from . . extractor . niconico import NiconicoIE
ie = NiconicoIE ( self . ydl )
video_id = info_dict [ ' id ' ]
# Get format index
for format_index , fmt in enumerate ( info_dict [ ' formats ' ] ) :
if fmt [ ' format_id ' ] == info_dict [ ' format_id ' ] :
break
# Get video info
total_duration = 0
fragment_duration = 0
for line in ie . _download_webpage ( info_dict [ ' url ' ] , video_id , note = ' Downloading m3u8 ' ) . splitlines ( ) :
if ' #STREAM-DURATION ' in line :
total_duration = int ( float ( line . split ( ' : ' ) [ 1 ] ) )
if ' #EXT-X-TARGETDURATION ' in line :
fragment_duration = int ( line . split ( ' : ' ) [ 1 ] )
if not all ( { total_duration , fragment_duration } ) :
raise DownloadError ( ' Unable to get required video info ' )
ctx = {
' filename ' : filename ,
' total_frags ' : math . ceil ( total_duration / fragment_duration ) ,
}
self . _prepare_and_start_frag_download ( ctx , info_dict )
downloaded_duration = ctx [ ' fragment_index ' ] * fragment_duration
while True :
if downloaded_duration > total_duration :
break
retry_manager = RetryManager ( self . params . get ( ' fragment_retries ' ) , self . report_retry )
for retry in retry_manager :
try :
# Refresh master m3u8 (if possible) and get the url of the previously-chose format
master_m3u8_url = ws_context . _master_m3u8_url ( )
formats = ie . _extract_m3u8_formats (
master_m3u8_url , video_id , query = { " start " : downloaded_duration } , live = False , note = False , fatal = False )
media_m3u8_url = traverse_obj ( formats , ( format_index , { dict } , ' url ' ) , get_all = False )
if not media_m3u8_url :
raise DownloadError ( ' Unable to get playlist ' )
# Get all fragments
media_m3u8 = ie . _download_webpage ( media_m3u8_url , video_id , note = False )
fragment_urls = traverse_obj ( media_m3u8 . splitlines ( ) , (
lambda _ , v : not v . startswith ( ' # ' ) , { lambda url : urljoin ( media_m3u8_url , url ) } ) )
with self . DurationLimiter ( len ( fragment_urls ) * fragment_duration * self . _PER_FRAGMENT_DOWNLOAD_RATIO ) :
for fragment_url in fragment_urls :
success = self . _download_fragment ( ctx , fragment_url , info_dict )
if not success :
return False
self . _append_fragment ( ctx , self . _read_fragment ( ctx ) )
downloaded_duration + = fragment_duration
except ( DownloadError , RequestError ) as err : # Including HTTPError and TransportError
retry . error = err
continue
if retry_manager . error :
return False
return self . _finish_frag_download ( ctx , info_dict )
class DurationLimiter ( ) :
def __init__ ( self , target ) :
self . target = target
def __enter__ ( self ) :
self . start = time . time ( )
def __exit__ ( self , * exc ) :
remaining = self . target - ( time . time ( ) - self . start )
if remaining > 0 :
time . sleep ( remaining )