@ -11,6 +11,7 @@ from yt_dlp.extractor.youtube._streaming.sabr.part import (
MediaSeekSabrPart ,
MediaSegmentInitSabrPart ,
MediaSegmentDataSabrPart ,
MediaSegmentEndSabrPart ,
)
from yt_dlp . extractor . youtube . _streaming . sabr . processor import (
@ -21,6 +22,7 @@ from yt_dlp.extractor.youtube._streaming.sabr.processor import (
ProcessSabrSeekResult ,
ProcessMediaHeaderResult ,
ProcessMediaResult ,
ProcessMediaEndResult ,
)
from yt_dlp . extractor . youtube . _streaming . sabr . models import (
AudioSelector ,
@ -2165,3 +2167,357 @@ class TestMedia:
segment_start_bytes = 0 ,
)
assert processor . partial_segments [ media_header . header_id ] . received_data_length == len ( example_payload )
class TestMediaEnd :
def test_init_segment_media_end ( self , base_args ) :
# Should process media end for init segment
selector = make_selector ( ' audio ' )
processor = SabrProcessor (
* * base_args ,
audio_selection = selector ,
)
fim = make_format_im ( selector )
processor . process_format_initialization_metadata ( fim )
media_header = make_init_header ( selector )
processor . process_media_header ( media_header )
processor . process_media ( media_header . header_id , media_header . content_length , io . BytesIO ( b ' example-init-data ' ) )
segment = processor . partial_segments [ media_header . header_id ]
result = processor . process_media_end ( media_header . header_id )
assert isinstance ( result , ProcessMediaEndResult )
assert isinstance ( result . sabr_part , MediaSegmentEndSabrPart )
assert result . sabr_part == MediaSegmentEndSabrPart (
format_selector = selector ,
format_id = selector . format_ids [ 0 ] ,
sequence_number = None ,
is_init_segment = True ,
total_segments = fim . total_segments ,
)
assert result . is_new_segment is True
assert media_header . header_id not in processor . partial_segments
init_format = processor . initialized_formats [ str ( selector . format_ids [ 0 ] ) ]
assert init_format . init_segment is segment
assert init_format . current_segment is None
assert not init_format . consumed_ranges
def test_media_segment_media_end ( self , base_args ) :
# Should process media end for a regular media segment
selector = make_selector ( ' audio ' )
processor = SabrProcessor (
* * base_args ,
audio_selection = selector ,
)
fim = make_format_im ( selector )
processor . process_format_initialization_metadata ( fim )
media_header = make_media_header ( selector , sequence_no = 1 )
processor . process_media_header ( media_header )
processor . process_media ( media_header . header_id , media_header . content_length , io . BytesIO ( b ' example-data ' ) )
segment = processor . partial_segments [ media_header . header_id ]
result = processor . process_media_end ( media_header . header_id )
assert isinstance ( result , ProcessMediaEndResult )
assert isinstance ( result . sabr_part , MediaSegmentEndSabrPart )
assert result . sabr_part == MediaSegmentEndSabrPart (
format_selector = selector ,
format_id = selector . format_ids [ 0 ] ,
sequence_number = 1 ,
is_init_segment = False ,
total_segments = fim . total_segments ,
)
assert result . is_new_segment is True
assert media_header . header_id not in processor . partial_segments
init_format = processor . initialized_formats [ str ( selector . format_ids [ 0 ] ) ]
assert init_format . init_segment is None
assert init_format . current_segment is segment
assert len ( init_format . consumed_ranges ) == 1
assert init_format . consumed_ranges [ 0 ] == ConsumedRange (
start_sequence_number = 1 ,
end_sequence_number = 1 ,
start_time_ms = media_header . start_ms ,
duration_ms = media_header . duration_ms ,
)
def test_media_segment_update_consumed_range ( self , base_args ) :
# Should update an existing consumed range the segment belongs to (at the end of)
selector = make_selector ( ' audio ' )
processor = SabrProcessor (
* * base_args ,
audio_selection = selector ,
)
fim = make_format_im ( selector )
processor . process_format_initialization_metadata ( fim )
init_format = processor . initialized_formats [ str ( selector . format_ids [ 0 ] ) ]
init_format . consumed_ranges . append (
ConsumedRange (
start_sequence_number = 1 ,
end_sequence_number = 3 ,
start_time_ms = 20 ,
duration_ms = 3000 ) )
# Unrelated consumed range
init_format . consumed_ranges . append (
ConsumedRange (
start_sequence_number = 6 ,
end_sequence_number = 10 ,
start_time_ms = 6000 ,
duration_ms = 3000 ) )
media_header = make_media_header ( selector , sequence_no = 4 )
media_header . start_ms = 3021
media_header . duration_ms = 1050
processor . process_media_header ( media_header )
processor . process_media ( media_header . header_id , media_header . content_length , io . BytesIO ( b ' example-data ' ) )
segment = processor . partial_segments [ media_header . header_id ]
result = processor . process_media_end ( media_header . header_id )
assert isinstance ( result , ProcessMediaEndResult )
assert isinstance ( result . sabr_part , MediaSegmentEndSabrPart )
assert result . sabr_part == MediaSegmentEndSabrPart (
format_selector = selector ,
format_id = selector . format_ids [ 0 ] ,
sequence_number = 4 ,
is_init_segment = False ,
total_segments = fim . total_segments ,
)
assert result . is_new_segment is True
assert len ( init_format . consumed_ranges ) == 2
assert init_format . consumed_ranges [ 0 ] == ConsumedRange (
start_sequence_number = 1 ,
end_sequence_number = 4 ,
start_time_ms = 20 ,
duration_ms = 4051 ,
)
assert init_format . consumed_ranges [ 1 ] == ConsumedRange (
start_sequence_number = 6 ,
end_sequence_number = 10 ,
start_time_ms = 6000 ,
duration_ms = 3000 ,
)
assert init_format . current_segment is segment
def test_media_segment_discard ( self , base_args ) :
# Should discard the segment if it is marked as discard. Consumed ranges should be updated.
selector = make_selector ( ' audio ' , discard_media = True )
processor = SabrProcessor (
* * base_args ,
audio_selection = selector ,
)
fim = make_format_im ( selector )
processor . process_format_initialization_metadata ( fim )
# Clear consumed ranges. We want to also handle the case when we cannot mark the format as entirely consumed.
processor . initialized_formats [ str ( selector . format_ids [ 0 ] ) ] . consumed_ranges . clear ( )
media_header = make_media_header ( selector , sequence_no = 1 )
processor . process_media_header ( media_header )
processor . process_media ( media_header . header_id , media_header . content_length , io . BytesIO ( b ' example-data ' ) )
segment = processor . partial_segments [ media_header . header_id ]
result = processor . process_media_end ( media_header . header_id )
assert isinstance ( result , ProcessMediaEndResult )
assert result . sabr_part is None
# New segment created, but discarded. Not previously consumed.
assert result . is_new_segment is True
assert media_header . header_id not in processor . partial_segments
init_format = processor . initialized_formats [ str ( selector . format_ids [ 0 ] ) ]
assert init_format . init_segment is None
assert init_format . current_segment is segment
assert len ( init_format . consumed_ranges ) == 1
def test_init_segment_discard ( self , base_args ) :
# Should discard the init segment if it is marked as discard.
selector = make_selector ( ' audio ' , discard_media = True )
processor = SabrProcessor (
* * base_args ,
audio_selection = selector ,
)
fim = make_format_im ( selector )
processor . process_format_initialization_metadata ( fim )
# Clear consumed ranges. We want to also handle the case when we cannot mark the format as entirely consumed.
processor . initialized_formats [ str ( selector . format_ids [ 0 ] ) ] . consumed_ranges . clear ( )
media_header = make_init_header ( selector )
processor . process_media_header ( media_header )
processor . process_media ( media_header . header_id , media_header . content_length , io . BytesIO ( b ' example-init-data ' ) )
segment = processor . partial_segments [ media_header . header_id ]
result = processor . process_media_end ( media_header . header_id )
assert isinstance ( result , ProcessMediaEndResult )
assert result . sabr_part is None
# New segment created, but discarded. Not previously consumed.
assert result . is_new_segment is True
assert media_header . header_id not in processor . partial_segments
init_format = processor . initialized_formats [ str ( selector . format_ids [ 0 ] ) ]
assert init_format . init_segment is segment
assert init_format . current_segment is None
assert len ( init_format . consumed_ranges ) == 0
def test_media_segment_consumed ( self , base_args ) :
# Should mark the segment as consumed (and discard) if it is already in consumed ranges
selector = make_selector ( ' audio ' )
processor = SabrProcessor (
* * base_args ,
audio_selection = selector ,
)
fim = make_format_im ( selector )
processor . process_format_initialization_metadata ( fim )
media_header = make_media_header ( selector , sequence_no = 1 )
# Simulate that the segment is already consumed
processor . initialized_formats [ str ( selector . format_ids [ 0 ] ) ] . consumed_ranges . append (
ConsumedRange (
start_sequence_number = 1 ,
end_sequence_number = 2 ,
start_time_ms = media_header . start_ms ,
duration_ms = media_header . duration_ms + 500 ,
) )
processor . process_media_header ( media_header )
processor . process_media ( media_header . header_id , media_header . content_length , io . BytesIO ( b ' example-data ' ) )
segment = processor . partial_segments [ media_header . header_id ]
result = processor . process_media_end ( media_header . header_id )
assert isinstance ( result , ProcessMediaEndResult )
assert result . sabr_part is None
assert result . is_new_segment is False
assert media_header . header_id not in processor . partial_segments
init_format = processor . initialized_formats [ str ( selector . format_ids [ 0 ] ) ]
assert init_format . init_segment is None
assert init_format . current_segment is segment
assert len ( init_format . consumed_ranges ) == 1
assert init_format . consumed_ranges [ 0 ] == ConsumedRange (
start_sequence_number = 1 ,
end_sequence_number = 2 ,
start_time_ms = media_header . start_ms ,
duration_ms = media_header . duration_ms + 500 ,
)
def test_init_segment_consumed ( self , base_args ) :
# Should mark the init segment as consumed (and discard) if already seen init segment
selector = make_selector ( ' audio ' )
processor = SabrProcessor (
* * base_args ,
audio_selection = selector ,
)
fim = make_format_im ( selector )
processor . process_format_initialization_metadata ( fim )
media_header = make_init_header ( selector )
# Simulate that the init segment is already consumed
processor . initialized_formats [ str ( selector . format_ids [ 0 ] ) ] . init_segment = Segment (
is_init_segment = True ,
format_id = selector . format_ids [ 0 ] ,
sequence_number = None ,
)
processor . process_media_header ( media_header )
processor . process_media ( media_header . header_id , media_header . content_length , io . BytesIO ( b ' example-init-data ' ) )
segment = processor . partial_segments [ media_header . header_id ]
result = processor . process_media_end ( media_header . header_id )
assert isinstance ( result , ProcessMediaEndResult )
assert result . sabr_part is None
assert result . is_new_segment is False
assert media_header . header_id not in processor . partial_segments
init_format = processor . initialized_formats [ str ( selector . format_ids [ 0 ] ) ]
assert init_format . init_segment is segment # xxx: Should we be taking the new init segment we discarded?
assert init_format . current_segment is None
assert len ( init_format . consumed_ranges ) == 0
def test_media_end_no_content_length ( self , base_args ) :
# Should not raise an error if segment does not have a content length
selector = make_selector ( ' audio ' )
processor = SabrProcessor (
* * base_args ,
audio_selection = selector ,
)
fim = make_format_im ( selector )
processor . process_format_initialization_metadata ( fim )
media_header = make_media_header ( selector , sequence_no = 1 )
media_header . content_length = None
processor . process_media_header ( media_header )
processor . process_media ( media_header . header_id , 500 , io . BytesIO ( b ' example-data ' ) )
result = processor . process_media_end ( media_header . header_id )
assert isinstance ( result , ProcessMediaEndResult )
assert isinstance ( result . sabr_part , MediaSegmentEndSabrPart )
def test_media_end_no_partial_segment ( self , base_args ) :
# Should raise an error if no partial segment found for the header_id
selector = make_selector ( ' audio ' )
processor = SabrProcessor (
* * base_args ,
audio_selection = selector ,
)
fim = make_format_im ( selector )
processor . process_format_initialization_metadata ( fim )
with pytest . raises ( SabrStreamError , match = ' Header ID 12345 not found in partial segments ' ) :
processor . process_media_end ( 12345 )
def test_content_length_mismatch ( self , base_args ) :
# Should raise an error if content length does not match the expected length
selector = make_selector ( ' audio ' )
processor = SabrProcessor (
* * base_args ,
audio_selection = selector ,
)
fim = make_format_im ( selector )
processor . process_format_initialization_metadata ( fim )
media_header = make_media_header ( selector , sequence_no = 1 )
processor . process_media_header ( media_header )
with pytest . raises ( SabrStreamError , match = ' Content length mismatch ' ) :
processor . process_media_end ( media_header . header_id )
assert media_header . header_id not in processor . partial_segments
init_format = processor . initialized_formats [ str ( selector . format_ids [ 0 ] ) ]
assert init_format . init_segment is None
assert init_format . current_segment is None
def test_estimated_content_length_mismatch ( self , base_args , logger ) :
# Should not raise an error if estimated content length does not match, rather log in trace
selector = make_selector ( ' audio ' )
processor = SabrProcessor (
* * base_args ,
audio_selection = selector ,
)
processor . is_live = True
fim = make_format_im ( selector )
processor . process_format_initialization_metadata ( fim )
media_header = make_media_header ( selector , sequence_no = 1 )
media_header . content_length = None
media_header . bitrate_bps = 1000000
media_header . duration_ms = 4000
processor . process_media_header ( media_header )
processor . process_media (
media_header . header_id ,
content_length = 500 , # Mismatch between what is estimated
data = io . BytesIO ( b ' example-data ' ) ,
)
segment = processor . partial_segments [ media_header . header_id ]
result = processor . process_media_end ( media_header . header_id )
assert isinstance ( result , ProcessMediaEndResult )
assert isinstance ( result . sabr_part , MediaSegmentEndSabrPart )
assert result . sabr_part == MediaSegmentEndSabrPart (
format_selector = selector ,
format_id = selector . format_ids [ 0 ] ,
sequence_number = 1 ,
is_init_segment = False ,
total_segments = fim . total_segments ,
)
assert result . is_new_segment is True
assert media_header . header_id not in processor . partial_segments
init_format = processor . initialized_formats [ str ( selector . format_ids [ 0 ] ) ]
assert init_format . init_segment is None
assert init_format . current_segment is segment
assert init_format . current_segment . content_length_estimated is True
assert init_format . current_segment . content_length == 4000000
assert init_format . current_segment . received_data_length == 500
logger . trace . assert_called_with (
f ' Content length for { segment . format_id } (sequence 1) was estimated, '
f ' estimated { segment . content_length } bytes, got { segment . received_data_length } bytes ' )