Skip to content

Commit

Permalink
Precache m3u8 playlists
Browse files Browse the repository at this point in the history
  • Loading branch information
AngellusMortis committed Jul 24, 2021
1 parent 3d504e6 commit 3f95277
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 16 deletions.
2 changes: 1 addition & 1 deletion sxm/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,5 @@ def main(
channel_name = channel.name.ljust(l3)[:l3]
typer.echo(f"{channel_id} | {channel_num} | {channel_name}")
else:
run_http_server(sxm, port, ip=host, cache_aac_chunks=precache)
run_http_server(sxm, port, ip=host, precache=precache)
return 0
71 changes: 56 additions & 15 deletions sxm/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import json
import logging
from asyncio import get_event_loop, sleep
from typing import Any, Callable, Coroutine, Dict, List
from time import monotonic
from typing import Any, Callable, Coroutine, Dict, List, Optional

from aiohttp import web

Expand All @@ -12,7 +13,7 @@


def make_http_handler(
sxm: SXMClientAsync, cache_aac_chunks: bool = True
sxm: SXMClientAsync, precache: bool = True
) -> Callable[[web.Request], Coroutine[Any, Any, web.Response]]:
"""
Creates and returns a configured `aiohttp` request handler ready to be used
Expand All @@ -28,6 +29,15 @@ def make_http_handler(
"""

aac_cache: Dict[str, bytes] = {}
playlist_cache: Dict[str, str] = {}
active_channel_id: Optional[str] = None

def set_active(channel_id: Optional[str], initial_playlist: Optional[str] = None):
active_channel_id = channel_id # pylint: disable=unused-variable # noqa

if precache and channel_id is not None and initial_playlist is not None:
loop = get_event_loop()
loop.create_task(cache_playlist(channel_id, initial_playlist.split("\n")))

async def get_segment(path: str):
try:
Expand All @@ -40,8 +50,11 @@ async def get_segment(path: str):

return data

async def cache_playlist(playlist: List[str]):
async def cache_playlist_chunks(end: float, playlist: List[str]):
for item in playlist:
if monotonic() >= end:
return

while len(aac_cache) > 10:
await sleep(1)

Expand All @@ -53,33 +66,61 @@ async def cache_playlist(playlist: List[str]):
aac_cache[item] = data
await sleep(1)

async def cache_playlist(channel_id: str, playlist: List[str]):
start = monotonic() - 3

while active_channel_id == channel_id:
await cache_playlist_chunks(start + 5, playlist)

new_playlist: Optional[str] = None
while new_playlist is None:
new_playlist = await sxm.get_playlist(channel_id)

playlist_cache[channel_id] = new_playlist
playlist = new_playlist.split("\n")
start = monotonic()

async def get_playlist_chunk(segment_path: str):
if segment_path in aac_cache:
data = aac_cache[segment_path]
del aac_cache[segment_path]
else:
data = await get_segment(segment_path)

return data

async def get_playlist(channel_id: str):
if channel_id in playlist_cache:
playlist: Optional[str] = playlist_cache[channel_id]
del playlist_cache[channel_id]
else:
playlist = await sxm.get_playlist(channel_id)

if active_channel_id != channel_id and playlist is not None:
set_active(channel_id, playlist)

return playlist

async def sxm_handler(request: web.Request):
"""SXM Response handler"""

response = web.Response(status=404)
if request.path.endswith(".m3u8"):
if not sxm.primary:
sxm.set_primary(True)
playlist = await sxm.get_playlist(request.path.rsplit("/", 1)[1][:-5])
channel_id = request.path.rsplit("/", 1)[1][:-5]
playlist = await get_playlist(channel_id)

if playlist:
if cache_aac_chunks:
loop = get_event_loop()
loop.create_task(cache_playlist(playlist.split("\n")))
response = web.Response(
status=200,
body=bytes(playlist, "utf-8"),
headers={"Content-Type": "application/x-mpegURL"},
)
else:
set_active(None)
response = web.Response(status=503)
elif request.path.endswith(".aac"):
segment_path = request.path[1:]
if segment_path in aac_cache:
data = aac_cache[segment_path]
del aac_cache[segment_path]
else:
data = await get_segment(segment_path)
data = await get_playlist_chunk(segment_path)

if data:
response = web.Response(
Expand Down Expand Up @@ -120,7 +161,7 @@ def run_http_server(
port: int,
ip="0.0.0.0", # nosec
logger: logging.Logger = None,
cache_aac_chunks: bool = True,
precache: bool = True,
) -> None:
"""
Creates and runs an instance of :class:`http.server.HTTPServer` to proxy
Expand Down

0 comments on commit 3f95277

Please sign in to comment.