Skip to content

Commit

Permalink
api & web: merge base queue ui & api updates
Browse files Browse the repository at this point in the history
  • Loading branch information
wukko committed Jan 8, 2025
2 parents 50db4d3 + 45e7b69 commit a6069f4
Show file tree
Hide file tree
Showing 28 changed files with 1,047 additions and 156 deletions.
56 changes: 15 additions & 41 deletions api/src/core/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@ import jwt from "../security/jwt.js";
import stream from "../stream/stream.js";
import match from "../processing/match.js";

import { env, isCluster, setTunnelPort } from "../config.js";
import { env } from "../config.js";
import { extract } from "../processing/url.js";
import { Green, Bright, Cyan } from "../misc/console-text.js";
import { Bright, Cyan } from "../misc/console-text.js";
import { hashHmac } from "../security/secrets.js";
import { createStore } from "../store/redis-ratelimit.js";
import { randomizeCiphers } from "../misc/randomize-ciphers.js";
import { verifyTurnstileToken } from "../security/turnstile.js";
import { friendlyServiceName } from "../processing/service-alias.js";
import { verifyStream, getInternalStream } from "../stream/manage.js";
import { verifyStream } from "../stream/manage.js";
import { createResponse, normalizeRequest, getIP } from "../processing/request.js";
import * as APIKeys from "../security/api-keys.js";
import * as Cookies from "../processing/cookie/manager.js";
import { setupTunnelHandler } from "./itunnel.js";

const git = {
branch: await getBranch(),
Expand Down Expand Up @@ -263,6 +264,15 @@ export const runAPI = async (express, app, __dirname, isPrimary = true) => {
}
})

app.use('/tunnel', cors({
methods: ['GET'],
exposedHeaders: [
'Estimated-Content-Length',
'Content-Disposition'
],
...corsConfig,
}));

app.get('/tunnel', apiTunnelLimiter, async (req, res) => {
const id = String(req.query.id);
const exp = String(req.query.exp);
Expand Down Expand Up @@ -292,31 +302,7 @@ export const runAPI = async (express, app, __dirname, isPrimary = true) => {
}

return stream(res, streamInfo);
})

const itunnelHandler = (req, res) => {
if (!req.ip.endsWith('127.0.0.1')) {
return res.sendStatus(403);
}

if (String(req.query.id).length !== 21) {
return res.sendStatus(400);
}

const streamInfo = getInternalStream(req.query.id);
if (!streamInfo) {
return res.sendStatus(404);
}

streamInfo.headers = new Map([
...(streamInfo.headers || []),
...Object.entries(req.headers)
]);

return stream(res, { type: 'internal', ...streamInfo });
};

app.get('/itunnel', itunnelHandler);
});

app.get('/', (_, res) => {
res.type('json');
Expand Down Expand Up @@ -378,17 +364,5 @@ export const runAPI = async (express, app, __dirname, isPrimary = true) => {
}
});

if (isCluster) {
const istreamer = express();
istreamer.get('/itunnel', itunnelHandler);
const server = istreamer.listen({
port: 0,
host: '127.0.0.1',
exclusive: true
}, () => {
const { port } = server.address();
console.log(`${Green('[✓]')} cobalt sub-instance running on 127.0.0.1:${port}`);
setTunnelPort(port);
});
}
setupTunnelHandler();
}
61 changes: 61 additions & 0 deletions api/src/core/itunnel.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import stream from "../stream/stream.js";
import { getInternalTunnel } from "../stream/manage.js";
import { setTunnelPort } from "../config.js";
import { Green } from "../misc/console-text.js";
import express from "express";

const validateTunnel = (req, res) => {
if (!req.ip.endsWith('127.0.0.1')) {
res.sendStatus(403);
return;
}

if (String(req.query.id).length !== 21) {
res.sendStatus(400);
return;
}

const streamInfo = getInternalTunnel(req.query.id);
if (!streamInfo) {
res.sendStatus(404);
return;
}

return streamInfo;
}

const streamTunnel = (req, res) => {
const streamInfo = validateTunnel(req, res);
if (!streamInfo) {
return;
}

streamInfo.headers = new Map([
...(streamInfo.headers || []),
...Object.entries(req.headers)
]);

return stream(res, { type: 'internal', ...streamInfo });
}

export const setupTunnelHandler = () => {
const tunnelHandler = express();

tunnelHandler.get('/itunnel', streamTunnel);

// fallback
tunnelHandler.use((_, res) => res.sendStatus(400));
// error handler
tunnelHandler.use((_, __, res, ____) => res.socket.end());


const server = tunnelHandler.listen({
port: 0,
host: '127.0.0.1',
exclusive: true
}, () => {
const { port } = server.address();
console.log(`${Green('[✓]')} internal tunnel handler running on 127.0.0.1:${port}`);
setTunnelPort(port);
});
}
3 changes: 2 additions & 1 deletion api/src/processing/services/bilibili.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ async function com_download(id) {
return {
urls: [video.baseUrl, audio.baseUrl],
audioFilename: `bilibili_${id}_audio`,
filename: `bilibili_${id}_${video.width}x${video.height}.mp4`
filename: `bilibili_${id}_${video.width}x${video.height}.mp4`,
isHLS: true
};
}

Expand Down
64 changes: 62 additions & 2 deletions api/src/stream/internal-hls.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import HLS from "hls-parser";
import { createInternalStream } from "./manage.js";
import { request } from "undici";

function getURL(url) {
try {
Expand Down Expand Up @@ -55,8 +56,11 @@ function transformMediaPlaylist(streamInfo, hlsPlaylist) {

const HLS_MIME_TYPES = ["application/vnd.apple.mpegurl", "audio/mpegurl", "application/x-mpegURL"];

export function isHlsResponse (req) {
return HLS_MIME_TYPES.includes(req.headers['content-type']);
export function isHlsResponse(req, streamInfo) {
return HLS_MIME_TYPES.includes(req.headers['content-type'])
// bluesky's cdn responds with wrong content-type for the hls playlist,
// so we enforce it here until they fix it
|| (streamInfo.service === 'bsky' && streamInfo.url.endsWith('.m3u8'));
}

export async function handleHlsPlaylist(streamInfo, req, res) {
Expand All @@ -71,3 +75,59 @@ export async function handleHlsPlaylist(streamInfo, req, res) {

res.send(hlsPlaylist);
}

async function getSegmentSize(url, config) {
const segmentResponse = await request(url, {
...config,
throwOnError: true
});

if (segmentResponse.headers['content-length']) {
segmentResponse.body.dump();
return +segmentResponse.headers['content-length'];
}

// if the response does not have a content-length
// header, we have to compute it ourselves
let size = 0;

for await (const data of segmentResponse.body) {
size += data.length;
}

return size;
}

export async function probeInternalHLSTunnel(streamInfo) {
const { url, headers, dispatcher, signal } = streamInfo;

// remove all falsy headers
Object.keys(headers).forEach(key => {
if (!headers[key]) delete headers[key];
});

const config = { headers, dispatcher, signal, maxRedirections: 16 };

const manifestResponse = await fetch(url, config);

const manifest = HLS.parse(await manifestResponse.text());
if (manifest.segments.length === 0)
return -1;

const segmentSamples = await Promise.all(
Array(5).fill().map(async () => {
const manifestIdx = Math.floor(Math.random() * manifest.segments.length);
const randomSegment = manifest.segments[manifestIdx];
if (!randomSegment.uri)
throw "segment is missing URI";

const segmentSize = await getSegmentSize(randomSegment.uri, config) / randomSegment.duration;
return segmentSize;
})
);

const averageBitrate = segmentSamples.reduce((a, b) => a + b) / segmentSamples.length;
const totalDuration = manifest.segments.reduce((acc, segment) => acc + segment.duration, 0);

return averageBitrate * totalDuration;
}
44 changes: 39 additions & 5 deletions api/src/stream/internal.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { request } from "undici";
import { Readable } from "node:stream";
import { closeRequest, getHeaders, pipe } from "./shared.js";
import { handleHlsPlaylist, isHlsResponse } from "./internal-hls.js";
import { handleHlsPlaylist, isHlsResponse, probeInternalHLSTunnel } from "./internal-hls.js";

const CHUNK_SIZE = BigInt(8e6); // 8 MB
const min = (a, b) => a < b ? a : b;
Expand Down Expand Up @@ -96,10 +96,7 @@ async function handleGenericStream(streamInfo, res) {
res.status(fileResponse.statusCode);
fileResponse.body.on('error', () => {});

// bluesky's cdn responds with wrong content-type for the hls playlist,
// so we enforce it here until they fix it
const isHls = isHlsResponse(fileResponse)
|| (streamInfo.service === "bsky" && streamInfo.url.endsWith('.m3u8'));
const isHls = isHlsResponse(fileResponse, streamInfo);

for (const [ name, value ] of Object.entries(fileResponse.headers)) {
if (!isHls || name.toLowerCase() !== 'content-length') {
Expand Down Expand Up @@ -133,3 +130,40 @@ export function internalStream(streamInfo, res) {

return handleGenericStream(streamInfo, res);
}

export async function probeInternalTunnel(streamInfo) {
try {
const signal = AbortSignal.timeout(3000);
const headers = {
...Object.fromEntries(streamInfo.headers || []),
...getHeaders(streamInfo.service),
host: undefined,
range: undefined
};

if (streamInfo.isHLS) {
return probeInternalHLSTunnel({
...streamInfo,
signal,
headers
});
}

const response = await request(streamInfo.url, {
method: 'HEAD',
headers,
dispatcher: streamInfo.dispatcher,
signal,
maxRedirections: 16
});

if (response.statusCode !== 200)
throw "status is not 200 OK";

const size = +response.headers['content-length'];
if (isNaN(size))
throw "content-length is not a number";

return size;
} catch {}
}
14 changes: 12 additions & 2 deletions api/src/stream/manage.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,20 @@ export function createStream(obj) {
return streamLink.toString();
}

export function getInternalStream(id) {
export function getInternalTunnel(id) {
return internalStreamCache.get(id);
}

export function getInternalTunnelFromURL(url) {
url = new URL(url);
if (url.hostname !== '127.0.0.1') {
return;
}

const id = url.searchParams.get('id');
return getInternalTunnel(id);
}

export function createInternalStream(url, obj = {}) {
assert(typeof url === 'string');

Expand Down Expand Up @@ -124,7 +134,7 @@ export function destroyInternalStream(url) {
const id = url.searchParams.get('id');

if (internalStreamCache.has(id)) {
closeRequest(getInternalStream(id)?.controller);
closeRequest(getInternalTunnel(id)?.controller);
internalStreamCache.delete(id);
}
}
Expand Down
39 changes: 39 additions & 0 deletions api/src/stream/shared.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { genericUserAgent } from "../config.js";
import { vkClientAgent } from "../processing/services/vk.js";
import { getInternalTunnelFromURL } from "./manage.js";
import { probeInternalTunnel } from "./internal.js";

const defaultHeaders = {
'user-agent': genericUserAgent
Expand Down Expand Up @@ -47,3 +49,40 @@ export function pipe(from, to, done) {

from.pipe(to);
}

export async function estimateTunnelLength(streamInfo, multiplier = 1.1) {
let urls = streamInfo.urls;
if (!Array.isArray(urls)) {
urls = [ urls ];
}

const internalTunnels = urls.map(getInternalTunnelFromURL);
if (internalTunnels.some(t => !t))
return -1;

const sizes = await Promise.all(internalTunnels.map(probeInternalTunnel));
const estimatedSize = sizes.reduce(
// if one of the sizes is missing, let's just make a very
// bold guess that it's the same size as the existing one
(acc, cur) => cur <= 0 ? acc * 2 : acc + cur,
0
);

if (isNaN(estimatedSize) || estimatedSize <= 0) {
return -1;
}

return Math.floor(estimatedSize * multiplier);
}

export function estimateAudioMultiplier(streamInfo) {
if (streamInfo.audioFormat === 'wav') {
return 1411 / 128;
}

if (streamInfo.audioCopy) {
return 1;
}

return streamInfo.audioBitrate / 128;
}
Loading

0 comments on commit a6069f4

Please sign in to comment.