@ -21,6 +21,7 @@ import subprocess
import socket
import sys
import time
import tokenize
import traceback
if os . name == ' nt ' :
@ -34,6 +35,7 @@ from .compat import (
compat_http_client ,
compat_kwargs ,
compat_str ,
compat_tokenize_tokenize ,
compat_urllib_error ,
compat_urllib_request ,
)
@ -851,8 +853,8 @@ class YoutubeDL(object):
else :
raise Exception ( ' Invalid result type: %s ' % result_type )
def _ apply_format_filter( self , format_spec , available_formats ) :
" Returns a tuple of the remaining format_spec and filtered formats "
def _ build_format_filter( self , filter_spec ) :
" Returns a function to filter the formats according to the filter_spec "
OPERATORS = {
' < ' : operator . lt ,
@ -862,13 +864,13 @@ class YoutubeDL(object):
' = ' : operator . eq ,
' != ' : operator . ne ,
}
operator_rex = re . compile ( r ''' (?x) \ s* \ [
operator_rex = re . compile ( r ''' (?x) \ s*
( ? P < key > width | height | tbr | abr | vbr | asr | filesize | fps )
\s * ( ? P < op > % s ) ( ? P < none_inclusive > \s * \? ) ? \s *
( ? P < value > [ 0 - 9. ] + ( ? : [ kKmMgGtTpPeEzZyY ] i ? [ Bb ] ? ) ? )
\] $
$
''' % ' | ' .join(map(re.escape, OPERATORS.keys())))
m = operator_rex . search ( f ormat _spec)
m = operator_rex . search ( f ilter _spec)
if m :
try :
comparison_value = int ( m . group ( ' value ' ) )
@ -879,7 +881,7 @@ class YoutubeDL(object):
if comparison_value is None :
raise ValueError (
' Invalid value %r in format specification %r ' % (
m . group ( ' value ' ) , f ormat _spec) )
m . group ( ' value ' ) , f ilter _spec) )
op = OPERATORS [ m . group ( ' op ' ) ]
if not m :
@ -887,85 +889,201 @@ class YoutubeDL(object):
' = ' : operator . eq ,
' != ' : operator . ne ,
}
str_operator_rex = re . compile ( r ''' (?x) \ s* \ [
str_operator_rex = re . compile ( r ''' (?x)
\s * ( ? P < key > ext | acodec | vcodec | container | protocol )
\s * ( ? P < op > % s ) ( ? P < none_inclusive > \s * \? ) ?
\s * ( ? P < value > [ a - zA - Z0 - 9 _ - ] + )
\s * \] $
\s * $
''' % ' | ' .join(map(re.escape, STR_OPERATORS.keys())))
m = str_operator_rex . search ( f ormat _spec)
m = str_operator_rex . search ( f ilter _spec)
if m :
comparison_value = m . group ( ' value ' )
op = STR_OPERATORS [ m . group ( ' op ' ) ]
if not m :
raise ValueError ( ' Invalid f ormat specification %r ' % format _spec)
raise ValueError ( ' Invalid f ilter specification %r ' % filter _spec)
def _filter ( f ) :
actual_value = f . get ( m . group ( ' key ' ) )
if actual_value is None :
return m . group ( ' none_inclusive ' )
return op ( actual_value , comparison_value )
new_formats = [ f for f in available_formats if _filter ( f ) ]
return _filter
def build_format_selector ( self , format_spec ) :
def syntax_error ( note , start ) :
message = (
' Invalid format specification: '
' {0} \n \t {1} \n \t {2} ^ ' . format ( note , format_spec , ' ' * start [ 1 ] ) )
return SyntaxError ( message )
PICKFIRST = ' PICKFIRST '
MERGE = ' MERGE '
SINGLE = ' SINGLE '
FormatSelector = collections . namedtuple ( ' FormatSelector ' , [ ' type ' , ' selector ' , ' filters ' ] )
def _parse_filter ( tokens ) :
filter_parts = [ ]
for type , string , start , _ , _ in tokens :
if type == tokenize . OP and string == ' ] ' :
return ' ' . join ( filter_parts )
else :
filter_parts . append ( string )
def _parse_format_selection ( tokens , endwith = [ ] ) :
selectors = [ ]
current_selector = None
for type , string , start , _ , _ in tokens :
# ENCODING is only defined in python 3.x
if type == getattr ( tokenize , ' ENCODING ' , None ) :
continue
elif type in [ tokenize . NAME , tokenize . NUMBER ] :
current_selector = FormatSelector ( SINGLE , string , [ ] )
elif type == tokenize . OP :
if string in endwith :
break
if string == ' , ' :
selectors . append ( current_selector )
current_selector = None
elif string == ' / ' :
first_choice = current_selector
second_choice = _parse_format_selection ( tokens , [ ' , ' ] )
current_selector = None
selectors . append ( FormatSelector ( PICKFIRST , ( first_choice , second_choice ) , [ ] ) )
elif string == ' [ ' :
if not current_selector :
current_selector = FormatSelector ( SINGLE , ' best ' , [ ] )
format_filter = _parse_filter ( tokens )
current_selector . filters . append ( format_filter )
elif string == ' + ' :
video_selector = current_selector
audio_selector = _parse_format_selection ( tokens , [ ' , ' ] )
current_selector = None
selectors . append ( FormatSelector ( MERGE , ( video_selector , audio_selector ) , [ ] ) )
else :
raise syntax_error ( ' Operator not recognized: " {0} " ' . format ( string ) , start )
elif type == tokenize . ENDMARKER :
break
if current_selector :
selectors . append ( current_selector )
return selectors
def _build_selector_function ( selector ) :
if isinstance ( selector , list ) :
fs = [ _build_selector_function ( s ) for s in selector ]
def selector_function ( formats ) :
for f in fs :
for format in f ( formats ) :
yield format
return selector_function
elif selector . type == PICKFIRST :
fs = [ _build_selector_function ( s ) for s in selector . selector ]
def selector_function ( formats ) :
for f in fs :
picked_formats = list ( f ( formats ) )
if picked_formats :
return picked_formats
return [ ]
elif selector . type == SINGLE :
format_spec = selector . selector
def selector_function ( formats ) :
if format_spec in [ ' best ' , ' worst ' , None ] :
format_idx = 0 if format_spec == ' worst ' else - 1
audiovideo_formats = [
f for f in formats
if f . get ( ' vcodec ' ) != ' none ' and f . get ( ' acodec ' ) != ' none ' ]
if audiovideo_formats :
yield audiovideo_formats [ format_idx ]
# for audio only (soundcloud) or video only (imgur) urls, select the best/worst audio format
elif ( all ( f . get ( ' acodec ' ) != ' none ' for f in formats ) or
all ( f . get ( ' vcodec ' ) != ' none ' for f in formats ) ) :
yield formats [ format_idx ]
elif format_spec == ' bestaudio ' :
audio_formats = [
f for f in formats
if f . get ( ' vcodec ' ) == ' none ' ]
if audio_formats :
yield audio_formats [ - 1 ]
elif format_spec == ' worstaudio ' :
audio_formats = [
f for f in formats
if f . get ( ' vcodec ' ) == ' none ' ]
if audio_formats :
yield audio_formats [ 0 ]
elif format_spec == ' bestvideo ' :
video_formats = [
f for f in formats
if f . get ( ' acodec ' ) == ' none ' ]
if video_formats :
yield video_formats [ - 1 ]
elif format_spec == ' worstvideo ' :
video_formats = [
f for f in formats
if f . get ( ' acodec ' ) == ' none ' ]
if video_formats :
yield video_formats [ 0 ]
else :
extensions = [ ' mp4 ' , ' flv ' , ' webm ' , ' 3gp ' , ' m4a ' , ' mp3 ' , ' ogg ' , ' aac ' , ' wav ' ]
if format_spec in extensions :
filter_f = lambda f : f [ ' ext ' ] == format_spec
else :
filter_f = lambda f : f [ ' format_id ' ] == format_spec
matches = list ( filter ( filter_f , formats ) )
if matches :
yield matches [ - 1 ]
elif selector . type == MERGE :
def _merge ( formats_info ) :
format_1 , format_2 = [ f [ ' format_id ' ] for f in formats_info ]
# The first format must contain the video and the
# second the audio
if formats_info [ 0 ] . get ( ' vcodec ' ) == ' none ' :
self . report_error ( ' The first format must '
' contain the video, try using '
' " -f %s + %s " ' % ( format_2 , format_1 ) )
return
output_ext = (
formats_info [ 0 ] [ ' ext ' ]
if self . params . get ( ' merge_output_format ' ) is None
else self . params [ ' merge_output_format ' ] )
return {
' requested_formats ' : formats_info ,
' format ' : ' %s + %s ' % ( formats_info [ 0 ] . get ( ' format ' ) ,
formats_info [ 1 ] . get ( ' format ' ) ) ,
' format_id ' : ' %s + %s ' % ( formats_info [ 0 ] . get ( ' format_id ' ) ,
formats_info [ 1 ] . get ( ' format_id ' ) ) ,
' width ' : formats_info [ 0 ] . get ( ' width ' ) ,
' height ' : formats_info [ 0 ] . get ( ' height ' ) ,
' resolution ' : formats_info [ 0 ] . get ( ' resolution ' ) ,
' fps ' : formats_info [ 0 ] . get ( ' fps ' ) ,
' vcodec ' : formats_info [ 0 ] . get ( ' vcodec ' ) ,
' vbr ' : formats_info [ 0 ] . get ( ' vbr ' ) ,
' stretched_ratio ' : formats_info [ 0 ] . get ( ' stretched_ratio ' ) ,
' acodec ' : formats_info [ 1 ] . get ( ' acodec ' ) ,
' abr ' : formats_info [ 1 ] . get ( ' abr ' ) ,
' ext ' : output_ext ,
}
video_selector , audio_selector = map ( _build_selector_function , selector . selector )
new_format_spec = format_spec [ : - len ( m . group ( 0 ) ) ]
if not new_format_spec :
new_format_spec = ' best '
def selector_function ( formats ) :
formats = list ( formats )
for pair in itertools . product ( video_selector ( formats ) , audio_selector ( formats ) ) :
yield _merge ( pair )
return ( new_format_spec , new_formats )
filters = [ self . _build_format_filter ( f ) for f in selector . filters ]
def select_format ( self , format_spec , available_formats ) :
while format_spec . endswith ( ' ] ' ) :
format_spec , available_formats = self . _apply_format_filter (
format_spec , available_formats )
if not available_formats :
return None
def final_selector ( formats ) :
for _filter in filters :
formats = list ( filter ( _filter , formats ) )
return selector_function ( formats )
return final_selector
if format_spec in [ ' best ' , ' worst ' , None ] :
format_idx = 0 if format_spec == ' worst ' else - 1
audiovideo_formats = [
f for f in available_formats
if f . get ( ' vcodec ' ) != ' none ' and f . get ( ' acodec ' ) != ' none ' ]
if audiovideo_formats :
return audiovideo_formats [ format_idx ]
# for audio only (soundcloud) or video only (imgur) urls, select the best/worst audio format
elif ( all ( f . get ( ' acodec ' ) != ' none ' for f in available_formats ) or
all ( f . get ( ' vcodec ' ) != ' none ' for f in available_formats ) ) :
return available_formats [ format_idx ]
elif format_spec == ' bestaudio ' :
audio_formats = [
f for f in available_formats
if f . get ( ' vcodec ' ) == ' none ' ]
if audio_formats :
return audio_formats [ - 1 ]
elif format_spec == ' worstaudio ' :
audio_formats = [
f for f in available_formats
if f . get ( ' vcodec ' ) == ' none ' ]
if audio_formats :
return audio_formats [ 0 ]
elif format_spec == ' bestvideo ' :
video_formats = [
f for f in available_formats
if f . get ( ' acodec ' ) == ' none ' ]
if video_formats :
return video_formats [ - 1 ]
elif format_spec == ' worstvideo ' :
video_formats = [
f for f in available_formats
if f . get ( ' acodec ' ) == ' none ' ]
if video_formats :
return video_formats [ 0 ]
else :
extensions = [ ' mp4 ' , ' flv ' , ' webm ' , ' 3gp ' , ' m4a ' , ' mp3 ' , ' ogg ' , ' aac ' , ' wav ' ]
if format_spec in extensions :
filter_f = lambda f : f [ ' ext ' ] == format_spec
else :
filter_f = lambda f : f [ ' format_id ' ] == format_spec
matches = list ( filter ( filter_f , available_formats ) )
if matches :
return matches [ - 1 ]
return None
stream = io . BytesIO ( format_spec . encode ( ' utf-8 ' ) )
tokens = compat_tokenize_tokenize ( stream . readline )
parsed_selector = _parse_format_selection ( tokens )
return _build_selector_function ( parsed_selector )
def _calc_headers ( self , info_dict ) :
res = std_headers . copy ( )
@ -1112,52 +1230,8 @@ class YoutubeDL(object):
if req_format == ' all ' :
formats_to_download = formats
else :
for rfstr in req_format . split ( ' , ' ) :
# We can accept formats requested in the format: 34/5/best, we pick
# the first that is available, starting from left
req_formats = rfstr . split ( ' / ' )
for rf in req_formats :
if re . match ( r ' .+? \ +.+? ' , rf ) is not None :
# Two formats have been requested like '137+139'
format_1 , format_2 = rf . split ( ' + ' )
formats_info = ( self . select_format ( format_1 , formats ) ,
self . select_format ( format_2 , formats ) )
if all ( formats_info ) :
# The first format must contain the video and the
# second the audio
if formats_info [ 0 ] . get ( ' vcodec ' ) == ' none ' :
self . report_error ( ' The first format must '
' contain the video, try using '
' " -f %s + %s " ' % ( format_2 , format_1 ) )
return
output_ext = (
formats_info [ 0 ] [ ' ext ' ]
if self . params . get ( ' merge_output_format ' ) is None
else self . params [ ' merge_output_format ' ] )
selected_format = {
' requested_formats ' : formats_info ,
' format ' : ' %s + %s ' % ( formats_info [ 0 ] . get ( ' format ' ) ,
formats_info [ 1 ] . get ( ' format ' ) ) ,
' format_id ' : ' %s + %s ' % ( formats_info [ 0 ] . get ( ' format_id ' ) ,
formats_info [ 1 ] . get ( ' format_id ' ) ) ,
' width ' : formats_info [ 0 ] . get ( ' width ' ) ,
' height ' : formats_info [ 0 ] . get ( ' height ' ) ,
' resolution ' : formats_info [ 0 ] . get ( ' resolution ' ) ,
' fps ' : formats_info [ 0 ] . get ( ' fps ' ) ,
' vcodec ' : formats_info [ 0 ] . get ( ' vcodec ' ) ,
' vbr ' : formats_info [ 0 ] . get ( ' vbr ' ) ,
' stretched_ratio ' : formats_info [ 0 ] . get ( ' stretched_ratio ' ) ,
' acodec ' : formats_info [ 1 ] . get ( ' acodec ' ) ,
' abr ' : formats_info [ 1 ] . get ( ' abr ' ) ,
' ext ' : output_ext ,
}
else :
selected_format = None
else :
selected_format = self . select_format ( rf , formats )
if selected_format is not None :
formats_to_download . append ( selected_format )
break
format_selector = self . build_format_selector ( req_format )
formats_to_download = list ( format_selector ( formats ) )
if not formats_to_download :
raise ExtractorError ( ' requested format not available ' ,
expected = True )