|
|
|
@ -1,7 +1,15 @@
|
|
|
|
|
import io
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
|
|
from yt_dlp.extractor.youtube._streaming.ump import varint_size, read_varint, UMPDecoder, UMPPartId
|
|
|
|
|
from yt_dlp.extractor.youtube._streaming.ump import (
|
|
|
|
|
varint_size,
|
|
|
|
|
read_varint,
|
|
|
|
|
UMPDecoder,
|
|
|
|
|
UMPPartId,
|
|
|
|
|
write_varint,
|
|
|
|
|
UMPEncoder,
|
|
|
|
|
UMPPart,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize('data, expected', [
|
|
|
|
@ -21,28 +29,32 @@ def test_varint_size(data, expected):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize('data, expected', [
|
|
|
|
|
# 1 byte long varint
|
|
|
|
|
(b'\x01', 1),
|
|
|
|
|
(b'\x4F', 79),
|
|
|
|
|
# 2 byte long varint
|
|
|
|
|
(b'\x80\x01', 64),
|
|
|
|
|
(b'\x8A\x7F', 8138),
|
|
|
|
|
(b'\xBF\x7F', 8191),
|
|
|
|
|
# 3 byte long varint
|
|
|
|
|
(b'\xC0\x80\x01', 12288),
|
|
|
|
|
(b'\xDF\x7F\xFF', 2093055),
|
|
|
|
|
# 4 byte long varint
|
|
|
|
|
(b'\xE0\x80\x80\x01', 1574912),
|
|
|
|
|
(b'\xEF\x7F\xFF\xFF', 268433407),
|
|
|
|
|
# 5 byte long varint
|
|
|
|
|
(b'\xF0\x80\x80\x80\x01', 25198720),
|
|
|
|
|
(b'\xFF\x7F\xFF\xFF\xFF', 4294967167),
|
|
|
|
|
(b'\xad\x05', 365),
|
|
|
|
|
(b'\xd5\x22\x05', 42069),
|
|
|
|
|
(b'\xe0\x68\x89\x09', 10000000),
|
|
|
|
|
(b'\xf0\xff\xc9\x9a\x3b', 999999999),
|
|
|
|
|
(b'\xf0\xff\xff\xff\xff', 4294967295),
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
def test_readvarint(data, expected):
|
|
|
|
|
assert read_varint(io.BytesIO(data)) == expected
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize('value, expected_bytes', [
|
|
|
|
|
(1, b'\x01'),
|
|
|
|
|
(365, b'\xad\x05'),
|
|
|
|
|
(42069, b'\xd5\x22\x05'),
|
|
|
|
|
(10000000, b'\xe0\x68\x89\x09'),
|
|
|
|
|
(999999999, b'\xf0\xff\xc9\x9a\x3b'),
|
|
|
|
|
(4294967295, b'\xf0\xff\xff\xff\xff'),
|
|
|
|
|
])
|
|
|
|
|
def test_writevarint(value, expected_bytes):
|
|
|
|
|
fp = io.BytesIO()
|
|
|
|
|
write_varint(fp, value)
|
|
|
|
|
assert fp.getvalue() == expected_bytes
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestUMPDecoder:
|
|
|
|
|
EXAMPLE_PART_DATA = [
|
|
|
|
|
{
|
|
|
|
@ -100,3 +112,21 @@ class TestUMPDecoder:
|
|
|
|
|
part.data.read()
|
|
|
|
|
|
|
|
|
|
assert mock_file.closed
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestUMPEncoder:
|
|
|
|
|
def test_write_part(self):
|
|
|
|
|
fp = io.BytesIO()
|
|
|
|
|
encoder = UMPEncoder(fp)
|
|
|
|
|
part = UMPPart(
|
|
|
|
|
part_id=UMPPartId.MEDIA_HEADER,
|
|
|
|
|
size=127,
|
|
|
|
|
data=io.BytesIO(b'\x01' * 127),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
encoder.write_part(part)
|
|
|
|
|
|
|
|
|
|
part_type = b'\x14' # MEDIA_HEADER part type
|
|
|
|
|
part_size = b'\x7F' # Part size of 127
|
|
|
|
|
expected_data = part_type + part_size + b'\x01' * 127
|
|
|
|
|
assert fp.getvalue() == expected_data
|
|
|
|
|