mirror of https://github.com/ekimekim/wubloader
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
328 lines
13 KiB
Python
328 lines
13 KiB
Python
|
|
# This file was taken from https://github.com/streamlink/streamlink/blob/30043408c74ddbb152e694b5afe1f185e6664b26/src/streamlink/stream/hls_playlist.py
|
|
# It is under the following licence:
|
|
|
|
# Copyright (c) 2011-2016, Christopher Rosell
|
|
# Copyright (c) 2016-2018, Streamlink Team
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are met:
|
|
#
|
|
# 1. Redistributions of source code must retain the above copyright notice, this
|
|
# list of conditions and the following disclaimer.
|
|
# 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
# this list of conditions and the following disclaimer in the documentation
|
|
# and/or other materials provided with the distribution.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
|
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
|
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
|
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
|
|
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
|
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
|
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
|
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
|
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
|
|
import re
|
|
|
|
from binascii import unhexlify
|
|
from collections import namedtuple
|
|
from itertools import starmap
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
|
|
# EXT-X-BYTERANGE
|
|
ByteRange = namedtuple("ByteRange", "range offset")
|
|
|
|
# EXT-X-KEY
|
|
Key = namedtuple("Key", "method uri iv key_format key_format_versions")
|
|
|
|
# EXT-X-MAP
|
|
Map = namedtuple("Map", "uri byterange")
|
|
|
|
# EXT-X-MEDIA
|
|
Media = namedtuple("Media", "uri type group_id language name default "
|
|
"autoselect forced characteristics")
|
|
|
|
# EXT-X-START
|
|
Start = namedtuple("Start", "time_offset precise")
|
|
|
|
# EXT-X-STREAM-INF
|
|
StreamInfo = namedtuple("StreamInfo", "bandwidth program_id codecs resolution "
|
|
"audio video subtitles")
|
|
|
|
# EXT-X-I-FRAME-STREAM-INF
|
|
IFrameStreamInfo = namedtuple("IFrameStreamInfo", "bandwidth program_id "
|
|
"codecs resolution video")
|
|
|
|
Playlist = namedtuple("Playlist", "uri stream_info media is_iframe")
|
|
Resolution = namedtuple("Resolution", "width height")
|
|
Segment = namedtuple("Segment", "uri duration title key discontinuity "
|
|
"byterange date map ad_reason")
|
|
|
|
|
|
class M3U8(object):
|
|
def __init__(self):
|
|
self.is_endlist = False
|
|
self.is_master = False
|
|
|
|
self.allow_cache = None
|
|
self.discontinuity_sequence = None
|
|
self.iframes_only = None
|
|
self.media_sequence = None
|
|
self.playlist_type = None
|
|
self.target_duration = None
|
|
self.start = None
|
|
self.version = None
|
|
|
|
self.media = []
|
|
self.playlists = []
|
|
self.segments = []
|
|
|
|
|
|
class M3U8Parser(object):
|
|
_extinf_re = re.compile(r"(?P<duration>\d+(\.\d+)?)(,(?P<title>.+))?")
|
|
_attr_re = re.compile(r"([A-Z\-]+)=(\d+\.\d+|0x[0-9A-z]+|\d+x\d+|\d+|\"(.+?)\"|[0-9A-z\-]+)")
|
|
_range_re = re.compile(r"(?P<range>\d+)(@(?P<offset>.+))?")
|
|
_tag_re = re.compile(r"#(?P<tag>[\w-]+)(:(?P<value>.+))?")
|
|
_res_re = re.compile(r"(\d+)x(\d+)")
|
|
|
|
def __init__(self, base_uri=None):
|
|
self.base_uri = base_uri
|
|
|
|
def create_stream_info(self, streaminf, cls=None):
|
|
program_id = streaminf.get("PROGRAM-ID")
|
|
|
|
bandwidth = streaminf.get("BANDWIDTH")
|
|
if bandwidth:
|
|
bandwidth = float(bandwidth)
|
|
|
|
resolution = streaminf.get("RESOLUTION")
|
|
if resolution:
|
|
resolution = self.parse_resolution(resolution)
|
|
|
|
codecs = streaminf.get("CODECS")
|
|
if codecs:
|
|
codecs = codecs.split(",")
|
|
else:
|
|
codecs = []
|
|
|
|
if cls == IFrameStreamInfo:
|
|
return IFrameStreamInfo(bandwidth, program_id, codecs, resolution,
|
|
streaminf.get("VIDEO"))
|
|
else:
|
|
return StreamInfo(bandwidth, program_id, codecs, resolution,
|
|
streaminf.get("AUDIO"), streaminf.get("VIDEO"),
|
|
streaminf.get("SUBTITLES"))
|
|
|
|
def split_tag(self, line):
|
|
match = self._tag_re.match(line)
|
|
|
|
if match:
|
|
return match.group("tag"), (match.group("value") or "").strip()
|
|
|
|
return None, None
|
|
|
|
def parse_attributes(self, value):
|
|
def map_attribute(key, value, quoted):
|
|
return (key, quoted or value)
|
|
|
|
attr = self._attr_re.findall(value)
|
|
|
|
return dict(starmap(map_attribute, attr))
|
|
|
|
def parse_bool(self, value):
|
|
return value == "YES"
|
|
|
|
def parse_byterange(self, value):
|
|
match = self._range_re.match(value)
|
|
|
|
if match:
|
|
return ByteRange(int(match.group("range")),
|
|
int(match.group("offset") or 0))
|
|
|
|
def parse_extinf(self, value):
|
|
match = self._extinf_re.match(value)
|
|
if match:
|
|
return float(match.group("duration")), match.group("title")
|
|
return (0, None)
|
|
|
|
def parse_hex(self, value):
|
|
value = value[2:]
|
|
if len(value) % 2:
|
|
value = "0" + value
|
|
|
|
return unhexlify(value)
|
|
|
|
def parse_resolution(self, value):
|
|
match = self._res_re.match(value)
|
|
|
|
if match:
|
|
width, height = int(match.group(1)), int(match.group(2))
|
|
else:
|
|
width, height = 0, 0
|
|
|
|
return Resolution(width, height)
|
|
|
|
def parse_tag(self, line, transform=None):
|
|
tag, value = self.split_tag(line)
|
|
|
|
if transform:
|
|
value = transform(value)
|
|
|
|
return value
|
|
|
|
def parse_line(self, line):
|
|
if not line.startswith("#"):
|
|
if self.state.pop("expect_segment", None):
|
|
byterange = self.state.pop("byterange", None)
|
|
duration, title = self.state.pop("extinf", (0, None))
|
|
date = self.state.pop("date", None)
|
|
map_ = self.state.get("map")
|
|
key = self.state.get("key")
|
|
scte35 = self.state.get("scte35")
|
|
|
|
if scte35:
|
|
ad_reason = "Contains scte35 data: {}".format(scte35)
|
|
elif title and title.startswith("Amazon"):
|
|
ad_reason = "Title begins with 'Amazon': {}".format(title)
|
|
else:
|
|
ad_reason = None # not an ad
|
|
|
|
segment = Segment(self.uri(line), duration,
|
|
title, key,
|
|
self.state.pop("discontinuity", False),
|
|
byterange, date, map_, ad_reason)
|
|
self.m3u8.segments.append(segment)
|
|
elif self.state.pop("expect_playlist", None):
|
|
streaminf = self.state.pop("streaminf", {})
|
|
stream_info = self.create_stream_info(streaminf)
|
|
playlist = Playlist(self.uri(line), stream_info, [], False)
|
|
self.m3u8.playlists.append(playlist)
|
|
elif line.startswith("#EXTINF"):
|
|
self.state["expect_segment"] = True
|
|
self.state["extinf"] = self.parse_tag(line, self.parse_extinf)
|
|
elif line.startswith("#EXT-X-BYTERANGE"):
|
|
self.state["expect_segment"] = True
|
|
self.state["byterange"] = self.parse_tag(line, self.parse_byterange)
|
|
elif line.startswith("#EXT-X-TARGETDURATION"):
|
|
self.m3u8.target_duration = self.parse_tag(line, int)
|
|
elif line.startswith("#EXT-X-MEDIA-SEQUENCE"):
|
|
self.m3u8.media_sequence = self.parse_tag(line, int)
|
|
elif line.startswith("#EXT-X-KEY"):
|
|
attr = self.parse_tag(line, self.parse_attributes)
|
|
iv = attr.get("IV")
|
|
if iv:
|
|
iv = self.parse_hex(iv)
|
|
self.state["key"] = Key(attr.get("METHOD"),
|
|
self.uri(attr.get("URI")),
|
|
iv, attr.get("KEYFORMAT"),
|
|
attr.get("KEYFORMATVERSIONS"))
|
|
elif line.startswith("#EXT-X-PROGRAM-DATE-TIME"):
|
|
self.state["date"] = self.parse_tag(line)
|
|
elif line.startswith("#EXT-X-ALLOW-CACHE"):
|
|
self.m3u8.allow_cache = self.parse_tag(line, self.parse_bool)
|
|
elif line.startswith("#EXT-X-STREAM-INF"):
|
|
self.state["streaminf"] = self.parse_tag(line, self.parse_attributes)
|
|
self.state["expect_playlist"] = True
|
|
elif line.startswith("#EXT-X-PLAYLIST-TYPE"):
|
|
self.m3u8.playlist_type = self.parse_tag(line)
|
|
elif line.startswith("#EXT-X-ENDLIST"):
|
|
self.m3u8.is_endlist = True
|
|
elif line.startswith("#EXT-X-MEDIA"):
|
|
attr = self.parse_tag(line, self.parse_attributes)
|
|
media = Media(self.uri(attr.get("URI")), attr.get("TYPE"),
|
|
attr.get("GROUP-ID"), attr.get("LANGUAGE"),
|
|
attr.get("NAME"),
|
|
self.parse_bool(attr.get("DEFAULT")),
|
|
self.parse_bool(attr.get("AUTOSELECT")),
|
|
self.parse_bool(attr.get("FORCED")),
|
|
attr.get("CHARACTERISTICS"))
|
|
self.m3u8.media.append(media)
|
|
elif line.startswith("#EXT-X-DISCONTINUITY"):
|
|
self.state["discontinuity"] = True
|
|
self.state["map"] = None
|
|
elif line.startswith("#EXT-X-DISCONTINUITY-SEQUENCE"):
|
|
self.m3u8.discontinuity_sequence = self.parse_tag(line, int)
|
|
elif line.startswith("#EXT-X-I-FRAMES-ONLY"):
|
|
self.m3u8.iframes_only = True
|
|
elif line.startswith("#EXT-X-MAP"):
|
|
attr = self.parse_tag(line, self.parse_attributes)
|
|
byterange = self.parse_byterange(attr.get("BYTERANGE", ""))
|
|
self.state["map"] = Map(self.uri(attr.get("URI")), byterange)
|
|
elif line.startswith("#EXT-X-I-FRAME-STREAM-INF"):
|
|
attr = self.parse_tag(line, self.parse_attributes)
|
|
streaminf = self.state.pop("streaminf", attr)
|
|
stream_info = self.create_stream_info(streaminf, IFrameStreamInfo)
|
|
playlist = Playlist(self.uri(attr.get("URI")), stream_info, [], True)
|
|
self.m3u8.playlists.append(playlist)
|
|
elif line.startswith("#EXT-X-VERSION"):
|
|
self.m3u8.version = self.parse_tag(line, int)
|
|
elif line.startswith("#EXT-X-START"):
|
|
attr = self.parse_tag(line, self.parse_attributes)
|
|
start = Start(attr.get("TIME-OFFSET"),
|
|
self.parse_bool(attr.get("PRECISE", "NO")))
|
|
self.m3u8.start = start
|
|
elif line.startswith("#EXT-X-SCTE35-OUT"):
|
|
# marks start of ad, with optional URL
|
|
attr = self.parse_tag(line, self.parse_attributes)
|
|
self.state["scte35"] = attr.get('URL') or "unknown"
|
|
elif line.startswith("#EXT-X-SCTE35-IN"):
|
|
# marks end of ad
|
|
self.state["scte35"] = None
|
|
|
|
def parse(self, data):
|
|
self.state = {}
|
|
self.m3u8 = M3U8()
|
|
|
|
lines = iter(filter(bool, data.splitlines()))
|
|
try:
|
|
line = next(lines)
|
|
except StopIteration:
|
|
return self.m3u8
|
|
else:
|
|
if not line.startswith("#EXTM3U"):
|
|
raise ValueError("Missing #EXTM3U header")
|
|
|
|
parse_line = self.parse_line
|
|
for line in lines:
|
|
parse_line(line)
|
|
|
|
# Associate Media entries with each Playlist
|
|
for playlist in self.m3u8.playlists:
|
|
for media_type in ("audio", "video", "subtitles"):
|
|
group_id = getattr(playlist.stream_info, media_type, None)
|
|
if group_id:
|
|
for media in filter(lambda m: m.group_id == group_id,
|
|
self.m3u8.media):
|
|
playlist.media.append(media)
|
|
|
|
self.m3u8.is_master = not not self.m3u8.playlists
|
|
|
|
return self.m3u8
|
|
|
|
def uri(self, uri):
|
|
if uri and urlparse(uri).scheme:
|
|
return uri
|
|
elif self.base_uri and uri:
|
|
return urljoin(self.base_uri, uri)
|
|
else:
|
|
return uri
|
|
|
|
|
|
def load(data, base_uri=None, parser=M3U8Parser):
|
|
"""Attempts to parse a M3U8 playlist from a string of data.
|
|
|
|
If specified, *base_uri* is the base URI that relative URIs will
|
|
be joined together with, otherwise relative URIs will be as is.
|
|
|
|
If specified, *parser* can be a M3U8Parser subclass to be used
|
|
to parse the data.
|
|
|
|
"""
|
|
return parser(base_uri).parse(data)
|