From 686a64d3812c472a1e091728a1666ac5369c74d8 Mon Sep 17 00:00:00 2001 From: Matthias Date: Wed, 23 Oct 2024 18:56:21 +0200 Subject: [PATCH] feat: FFprobe input to extract defaults. (#79) * Added ffprobe worker * Added ffprobe as first job for transcode * Added ffprobe as child job of transcode * Remove logs on complete * Changed log stream to input * No longer allow non strict indexes --- packages/api/src/index.ts | 16 +- packages/api/src/jobs.ts | 26 +- packages/api/src/types.ts | 1 - packages/app/src/components/JobState.tsx | 4 - packages/app/src/components/JobsStats.tsx | 11 - packages/app/src/pages/JobsPage.tsx | 4 +- packages/artisan/src/assert.ts | 8 + packages/artisan/src/consumer/helpers.ts | 47 +-- .../src/consumer/{tmp-dir.ts => lib/dir.ts} | 4 +- .../src/consumer/lib/worker-processor.ts | 20 + packages/artisan/src/consumer/s3.ts | 9 +- .../artisan/src/consumer/workers/ffmpeg.ts | 132 ++----- .../artisan/src/consumer/workers/ffprobe.ts | 93 +++++ .../artisan/src/consumer/workers/helpers.ts | 3 - .../artisan/src/consumer/workers/index.ts | 32 +- .../artisan/src/consumer/workers/package.ts | 57 +-- .../artisan/src/consumer/workers/transcode.ts | 366 +++++++++++++++--- packages/artisan/src/defaults.ts | 79 ++++ packages/artisan/src/producer/index.ts | 142 +++---- packages/artisan/src/types.ts | 23 +- packages/config/tsconfig.bun.json | 3 +- packages/shared/package.json | 3 +- packages/shared/src/index.ts | 5 - packages/shared/src/lang.ts | 11 + packages/stitcher/src/filters.ts | 4 + packages/stitcher/src/parser/helpers.ts | 3 + packages/stitcher/src/parser/lexical-parse.ts | 3 + packages/stitcher/src/parser/parse.ts | 8 +- packages/stitcher/src/parser/stringify.ts | 3 + packages/stitcher/src/playlist.ts | 2 +- packages/stitcher/src/presentation.ts | 2 + packages/stitcher/src/url.ts | 2 +- packages/stitcher/src/vast.ts | 7 - 33 files changed, 743 insertions(+), 390 deletions(-) create mode 100644 packages/artisan/src/assert.ts rename packages/artisan/src/consumer/{tmp-dir.ts => lib/dir.ts} (93%) create mode 100644 packages/artisan/src/consumer/lib/worker-processor.ts create mode 100644 packages/artisan/src/consumer/workers/ffprobe.ts delete mode 100644 packages/artisan/src/consumer/workers/helpers.ts create mode 100644 packages/artisan/src/defaults.ts delete mode 100644 packages/shared/src/index.ts create mode 100644 packages/shared/src/lang.ts diff --git a/packages/api/src/index.ts b/packages/api/src/index.ts index 8ed0bc20..eaa589bb 100644 --- a/packages/api/src/index.ts +++ b/packages/api/src/index.ts @@ -67,6 +67,7 @@ const app = new Elysia() description: "The source path, starting with http(s):// or s3://", }), + height: t.Optional(t.Number()), }), t.Object({ type: t.Literal("audio"), @@ -74,7 +75,8 @@ const app = new Elysia() description: "The source path, starting with http(s):// or s3://", }), - language: LangCodeSchema, + language: t.Optional(LangCodeSchema), + channels: t.Optional(t.Number()), }), t.Object({ type: t.Literal("text"), @@ -97,15 +99,17 @@ const app = new Elysia() type: t.Literal("video"), codec: VideoCodecSchema, height: t.Number(), - bitrate: t.Number({ description: "Bitrate in bps" }), - framerate: t.Number({ description: "Frames per second" }), + bitrate: t.Optional(t.Number({ description: "Bitrate in bps" })), + framerate: t.Optional( + t.Number({ description: "Frames per second" }), + ), }), t.Object({ type: t.Literal("audio"), codec: AudioCodecSchema, - bitrate: t.Number({ description: "Bitrate in bps" }), - language: LangCodeSchema, - channels: t.Number(), + bitrate: t.Optional(t.Number({ description: "Bitrate in bps" })), + language: t.Optional(LangCodeSchema), + channels: t.Optional(t.Number()), }), t.Object({ type: t.Literal("text"), diff --git a/packages/api/src/jobs.ts b/packages/api/src/jobs.ts index fdad0f18..3a371ece 100644 --- a/packages/api/src/jobs.ts +++ b/packages/api/src/jobs.ts @@ -13,6 +13,9 @@ function findQueueByName(name: string): Queue { function formatIdPair(id: string): [Queue, string] { const queueName = id.split("_", 1)[0]; + if (!queueName) { + throw new Error("Missing queueName as prefix when formatting id pair"); + } return [findQueueByName(queueName), id]; } @@ -95,16 +98,16 @@ async function formatJobNode(node: JobNode): Promise { progress = job.progress; } - const state = mapJobState(await job.getState(), job.returnvalue); + const state = mapJobState(await job.getState()); const failedReason = state === "failed" ? job.failedReason : undefined; - const findParentSortKey = (job: BullMQJob): number => { - const value = job.data?.metadata?.parentSortKey; + const findParentSortIndex = (job: BullMQJob): number => { + const value = job.data?.parentSortIndex; return typeof value === "number" ? value : 0; }; (children ?? []).sort( - (a, b) => findParentSortKey(a.job) - findParentSortKey(b.job), + (a, b) => findParentSortIndex(a.job) - findParentSortIndex(b.job), ); const jobChildren = await Promise.all((children ?? []).map(formatJobNode)); @@ -150,20 +153,7 @@ async function formatJobNode(node: JobNode): Promise { }; } -// Keep these in sync with ocnsumer/workers/helpers.ts in artisan, -// we can treat the result as a string literal to indicate non standard -// job states such as "skipped". -type JobReturnValueStatus = "__JOB_SKIPPED__"; - -function mapJobState( - jobState: JobState | "unknown", - maybeReturnValue?: JobReturnValueStatus, -): Job["state"] { - // We pass maybeReturnValue as "any" from the input, it's not typed. But we - // can check whether it is a defined return value for non standard job states. - if (maybeReturnValue === "__JOB_SKIPPED__") { - return "skipped"; - } +function mapJobState(jobState: JobState | "unknown"): Job["state"] { if (jobState === "active" || jobState === "waiting-children") { return "running"; } diff --git a/packages/api/src/types.ts b/packages/api/src/types.ts index 790f92ac..fe886299 100644 --- a/packages/api/src/types.ts +++ b/packages/api/src/types.ts @@ -11,7 +11,6 @@ export const JobSchema = t.Recursive( t.Literal("running"), t.Literal("failed"), t.Literal("completed"), - t.Literal("skipped"), ]), progress: t.Number(), createdOn: t.Number(), diff --git a/packages/app/src/components/JobState.tsx b/packages/app/src/components/JobState.tsx index 2e82b1fa..1b4c6251 100644 --- a/packages/app/src/components/JobState.tsx +++ b/packages/app/src/components/JobState.tsx @@ -2,7 +2,6 @@ import Loader from "lucide-react/icons/loader"; import CircleDotDashed from "lucide-react/icons/circle-dot-dashed"; import Check from "lucide-react/icons/check"; import X from "lucide-react/icons/x"; -import CircleOff from "lucide-react/icons/circle-off"; import { cn } from "@/lib/utils"; import type { Job } from "@/api"; @@ -16,9 +15,6 @@ export function JobState({ state }: { state: Job["state"] }) { if (state === "running") { return createCircle("bg-blue-200 text-blue-800", Loader, "animate-spin"); } - if (state === "skipped") { - return createCircle("bg-gray-200 text-gray-800", CircleOff); - } return createCircle("bg-violet-200 text-violet-800", CircleDotDashed); } diff --git a/packages/app/src/components/JobsStats.tsx b/packages/app/src/components/JobsStats.tsx index 443a32a6..6b0b7238 100644 --- a/packages/app/src/components/JobsStats.tsx +++ b/packages/app/src/components/JobsStats.tsx @@ -13,7 +13,6 @@ export function JobsStats({ jobs, filter, onChange }: JobsStatsProps) { let completed = 0; let failed = 0; let running = 0; - let skipped = 0; for (const job of jobs) { if (job.state === "completed") { @@ -25,9 +24,6 @@ export function JobsStats({ jobs, filter, onChange }: JobsStatsProps) { if (job.state === "failed") { failed += 1; } - if (job.state === "skipped") { - skipped += 1; - } } const filterJobState = (state?: Job["state"]) => { @@ -61,13 +57,6 @@ export function JobsStats({ jobs, filter, onChange }: JobsStatsProps) { active={filter.state === "running"} tooltip="Running" /> - filterJobState("skipped")} - active={filter.state === "skipped"} - tooltip="Skipped" - /> ); diff --git a/packages/app/src/pages/JobsPage.tsx b/packages/app/src/pages/JobsPage.tsx index 2ff13ecf..29f5033d 100644 --- a/packages/app/src/pages/JobsPage.tsx +++ b/packages/app/src/pages/JobsPage.tsx @@ -43,7 +43,9 @@ export function JobsPage() { {filteredJobs.length ? ( ) : ( -

No jobs found...

+

+ Nothing here but tumbleweeds... and they're not clickable. +

)} diff --git a/packages/artisan/src/assert.ts b/packages/artisan/src/assert.ts new file mode 100644 index 00000000..a484f719 --- /dev/null +++ b/packages/artisan/src/assert.ts @@ -0,0 +1,8 @@ +export function assert( + value: T, + message: string = "value is null", +): asserts value is NonNullable { + if (value === null || value === undefined) { + throw Error(message); + } +} diff --git a/packages/artisan/src/consumer/helpers.ts b/packages/artisan/src/consumer/helpers.ts index c54110ec..594f4314 100644 --- a/packages/artisan/src/consumer/helpers.ts +++ b/packages/artisan/src/consumer/helpers.ts @@ -1,26 +1,7 @@ -import { Job, Queue } from "bullmq"; -import { connection } from "./env"; - -/** - * Gets the full job. This is particularly handy when you need - * children values from child jobs. - * @param job Full job - * @returns - */ -export async function getFakeJob(job: Job) { - if (!job.id) { - throw new Error("Missing job id"); - } - - const queue = new Queue(job.queueName, { connection }); - const fakeJob = await Job.fromId(queue, job.id); - - if (!fakeJob) { - throw new Error("Failed to fetch fake job"); - } - - return fakeJob; -} +import parseFilepath from "parse-filepath"; +import { downloadFile } from "./s3"; +import { Dir } from "./lib/dir"; +import type { PartialInput } from "../types"; export async function getBinaryPath(name: string) { const direct = `${process.cwd()}/bin/${name}`; @@ -40,3 +21,23 @@ export async function getBinaryPath(name: string) { `Failed to get bin dep "${name}", run scripts/bin-deps.sh to install binary dependencies.`, ); } + +export async function getInputPath(input: PartialInput, dir: Dir | string) { + const filePath = parseFilepath(input.path); + + // If the input is on S3, download the file locally. + if (filePath.dir.startsWith("s3://")) { + const inDir = dir instanceof Dir ? await dir.createTempDir() : dir; + await downloadFile(inDir, filePath.path.replace("s3://", "")); + return parseFilepath(`${inDir}/${filePath.basename}`); + } + + if ( + filePath.dir.startsWith("http://") || + filePath.dir.startsWith("https://") + ) { + return filePath; + } + + throw new Error("Failed to resolve input path"); +} diff --git a/packages/artisan/src/consumer/tmp-dir.ts b/packages/artisan/src/consumer/lib/dir.ts similarity index 93% rename from packages/artisan/src/consumer/tmp-dir.ts rename to packages/artisan/src/consumer/lib/dir.ts index 690b51a4..4db24127 100644 --- a/packages/artisan/src/consumer/tmp-dir.ts +++ b/packages/artisan/src/consumer/lib/dir.ts @@ -5,14 +5,14 @@ import * as os from "node:os"; /** * Manager for temporary directories on file system. */ -export class TmpDir { +export class Dir { private dirs_ = new Set(); /** * Create a new temporary directory. * @returns */ - async create() { + async createTempDir() { const dir = await fs.mkdtemp( path.join(os.tmpdir(), `superstreamer-${crypto.randomUUID()}`), ); diff --git a/packages/artisan/src/consumer/lib/worker-processor.ts b/packages/artisan/src/consumer/lib/worker-processor.ts new file mode 100644 index 00000000..571b5b95 --- /dev/null +++ b/packages/artisan/src/consumer/lib/worker-processor.ts @@ -0,0 +1,20 @@ +import { Dir } from "./dir"; +import type { Job } from "bullmq"; + +export type WorkerCallback = (params: { + job: Job; + token?: string | undefined; + dir: Dir; +}) => Promise; + +export function createWorkerProcessor(callback: WorkerCallback) { + const dir = new Dir(); + + return async (job: Job, token?: string) => { + try { + return await callback({ job, token, dir }); + } finally { + await dir.deleteAll(); + } + }; +} diff --git a/packages/artisan/src/consumer/s3.ts b/packages/artisan/src/consumer/s3.ts index 6934777d..556f3878 100644 --- a/packages/artisan/src/consumer/s3.ts +++ b/packages/artisan/src/consumer/s3.ts @@ -53,6 +53,13 @@ export async function downloadFolder(key: string, path: string) { * @param key S3 key */ export async function downloadFile(path: string, key: string) { + const name = `${path}/${basename(key)}`; + + if (await Bun.file(name).exists()) { + // If the file already exists, we have nothing to do. + return; + } + const response = await client.send( new GetObjectCommand({ Bucket: env.S3_BUCKET, @@ -60,7 +67,7 @@ export async function downloadFile(path: string, key: string) { }), ); - await writeFile(`${path}/${basename(key)}`, response.Body as Readable); + await writeFile(name, response.Body as Readable); } /** diff --git a/packages/artisan/src/consumer/workers/ffmpeg.ts b/packages/artisan/src/consumer/workers/ffmpeg.ts index ec5393c7..da581fa2 100644 --- a/packages/artisan/src/consumer/workers/ffmpeg.ts +++ b/packages/artisan/src/consumer/workers/ffmpeg.ts @@ -1,49 +1,36 @@ -import parseFilePath from "parse-filepath"; import { FFmpeggy } from "ffmpeggy"; -import { downloadFile, uploadFile } from "../s3"; -import { TmpDir } from "../tmp-dir"; -import { getBinaryPath } from "../helpers"; -import { JOB_SKIPPED } from "./helpers"; -import type { FFprobeResult } from "ffmpeggy"; -import type { Job } from "bullmq"; +import { uploadFile } from "../s3"; +import { getBinaryPath, getInputPath } from "../helpers"; +import type { WorkerCallback } from "../lib/worker-processor"; import type { Stream, Input } from "../../types"; -import type { SkippableJobResult } from "./helpers"; const ffmpegBin = await getBinaryPath("ffmpeg"); -const ffprobeBin = await getBinaryPath("ffprobe"); FFmpeggy.DefaultConfig = { ...FFmpeggy.DefaultConfig, ffmpegBin, - ffprobeBin, }; export type FfmpegData = { - params: { - input: Input; - stream: Stream; - segmentSize: number; - assetId: string; - }; - metadata: { - parentSortKey: number; - }; + input: Input; + stream: Stream; + segmentSize: number; + assetId: string; + parentSortIndex: number; }; -export type FfmpegResult = SkippableJobResult<{ +export type FfmpegResult = { name: string; stream: Stream; -}>; - -async function runJob( - job: Job, - tmpDir: TmpDir, -): Promise { - const { params } = job.data; +}; - const outDir = await tmpDir.create(); +export const ffmpegCallback: WorkerCallback = async ({ + job, + dir, +}) => { + const outDir = await dir.createTempDir(); - const inputFile = await getInput(job, tmpDir, params.input); + const inputFile = await getInputPath(job.data.input, dir); job.log(`Input is ${inputFile.path}`); @@ -53,37 +40,22 @@ async function runJob( }); let name: string | undefined; - const outputOptions: string[] = []; - if (params.stream.type === "video") { - const inputInfo = await FFmpeggy.probe(inputFile.path); - job.log(`Probed info (${JSON.stringify(inputInfo)})`); - - const maxHeight = getMaxHeight(inputInfo); + const { stream } = job.data; - if (maxHeight > 0 && params.stream.height > maxHeight) { - job.log( - `Skip upscale, requested ${params.stream.height} is larger than input ${maxHeight}`, - ); - return JOB_SKIPPED; - } - - name = `video_${params.stream.height}_${params.stream.bitrate}_${params.stream.codec}.m4v`; - outputOptions.push( - ...getVideoOutputOptions(params.stream, params.segmentSize), - ); + if (stream.type === "video") { + name = `video_${stream.height}_${stream.bitrate}_${stream.codec}.m4v`; + outputOptions.push(...getVideoOutputOptions(stream, job.data.segmentSize)); } - if (params.stream.type === "audio") { - name = `audio_${params.stream.language}_${params.stream.bitrate}_${params.stream.codec}.m4a`; - outputOptions.push( - ...getAudioOutputOptions(params.stream, params.segmentSize), - ); + if (stream.type === "audio") { + name = `audio_${stream.language}_${stream.bitrate}_${stream.codec}.m4a`; + outputOptions.push(...getAudioOutputOptions(stream, job.data.segmentSize)); } - if (params.stream.type === "text") { - name = `text_${params.stream.language}.vtt`; + if (stream.type === "text") { + name = `text_${stream.language}.vtt`; outputOptions.push(...getTextOutputOptions()); } @@ -112,24 +84,20 @@ async function runJob( job.updateProgress(100); - job.log(`Uploading ${outDir}/${name} to transcode/${params.assetId}/${name}`); + job.log( + `Uploading ${outDir}/${name} to transcode/${job.data.assetId}/${name}`, + ); - await uploadFile(`transcode/${params.assetId}/${name}`, `${outDir}/${name}`); + await uploadFile( + `transcode/${job.data.assetId}/${name}`, + `${outDir}/${name}`, + ); return { name, - stream: params.stream, + stream: job.data.stream, }; -} - -export default async function (job: Job) { - const tmpDir = new TmpDir(); - try { - return await runJob(job, tmpDir); - } finally { - await tmpDir.deleteAll(); - } -} +}; function getVideoOutputOptions( stream: Extract, @@ -206,35 +174,3 @@ function getTextOutputOptions() { const args: string[] = ["-f webvtt"]; return args; } - -function getMaxHeight(info: FFprobeResult) { - return info.streams.reduce((acc, stream) => { - if (!stream.height) { - return acc; - } - return acc > stream.height ? acc : stream.height; - }, 0); -} - -async function getInput(job: Job, tmpDir: TmpDir, input: Input) { - const filePath = parseFilePath(input.path); - - // If the input is on S3, download the file locally. - if (filePath.dir.startsWith("s3://")) { - const inDir = await tmpDir.create(); - - job.log(`Download "${filePath.path}" to "${inDir}"`); - await downloadFile(inDir, filePath.path.replace("s3://", "")); - - return parseFilePath(`${inDir}/${filePath.basename}`); - } - - if ( - filePath.dir.startsWith("http://") || - filePath.dir.startsWith("https://") - ) { - return filePath; - } - - throw new Error("Failed to resolve input path"); -} diff --git a/packages/artisan/src/consumer/workers/ffprobe.ts b/packages/artisan/src/consumer/workers/ffprobe.ts new file mode 100644 index 00000000..0311db7c --- /dev/null +++ b/packages/artisan/src/consumer/workers/ffprobe.ts @@ -0,0 +1,93 @@ +import { FFmpeggy } from "ffmpeggy"; +import { getBinaryPath, getInputPath } from "../helpers"; +import type { PartialInput } from "../../types"; +import type { WorkerCallback } from "../lib/worker-processor"; + +const ffprobeBin = await getBinaryPath("ffprobe"); + +FFmpeggy.DefaultConfig = { + ...FFmpeggy.DefaultConfig, + ffprobeBin, +}; + +type VideoInfo = { + height?: number; + framerate?: number; +}; + +type AudioInfo = { + language?: string; + channels?: number; +}; + +export type FfprobeData = { + inputs: PartialInput[]; + parentSortIndex: number; +}; + +export type FfprobeResult = { + video: Record; + audio: Record; +}; + +export const ffprobeCallback: WorkerCallback< + FfprobeData, + FfprobeResult +> = async ({ job, dir }) => { + const result: FfprobeResult = { + video: {}, + audio: {}, + }; + + const tempDir = await dir.createTempDir(); + + for (const input of job.data.inputs) { + const file = await getInputPath(input, tempDir); + const info = await FFmpeggy.probe(file.path); + + if (input.type === "video") { + const stream = info.streams.find( + (stream) => stream.codec_type === "video", + ); + + const framerate = stream?.avg_frame_rate + ? parseFrameRate(stream.avg_frame_rate) + : undefined; + + result.video[input.path] = { + height: stream?.height, + framerate, + }; + } + + if (input.type === "audio") { + const stream = info.streams.find( + (stream) => stream.codec_type === "audio", + ); + result.audio[input.path] = { + language: stream?.tags.language, + channels: stream?.channels, + }; + } + + job.log(`${input.path}: ${JSON.stringify(info)}`); + } + + return result; +}; + +function parseFrameRate(avg: string) { + const fraction = avg.split("/"); + + if (fraction[1]?.endsWith("|")) { + fraction[1] = fraction[1].substring(0, fraction[1].length - 1); + } + + if (fraction[0] && fraction[1]) { + return +fraction[0] / +fraction[1]; + } + + if (fraction[0]) { + return +fraction[0]; + } +} diff --git a/packages/artisan/src/consumer/workers/helpers.ts b/packages/artisan/src/consumer/workers/helpers.ts deleted file mode 100644 index 9d129c2b..00000000 --- a/packages/artisan/src/consumer/workers/helpers.ts +++ /dev/null @@ -1,3 +0,0 @@ -export const JOB_SKIPPED = "__JOB_SKIPPED__"; - -export type SkippableJobResult = typeof JOB_SKIPPED | T; diff --git a/packages/artisan/src/consumer/workers/index.ts b/packages/artisan/src/consumer/workers/index.ts index 801eab37..16dff8d7 100644 --- a/packages/artisan/src/consumer/workers/index.ts +++ b/packages/artisan/src/consumer/workers/index.ts @@ -1,13 +1,33 @@ import { Worker } from "bullmq"; import { connection } from "../env"; -import transcodeFn from "./transcode"; -import packageFn from "./package"; -import ffmpegFn from "./ffmpeg"; +import { transcodeCallback } from "./transcode"; +import { packageCallback } from "./package"; +import { ffmpegCallback } from "./ffmpeg"; +import { ffprobeCallback } from "./ffprobe"; +import { createWorkerProcessor } from "../lib/worker-processor"; + +const transcodeProcessor = createWorkerProcessor(transcodeCallback); +const packageProcessor = createWorkerProcessor(packageCallback); +const ffmpegProcessor = createWorkerProcessor(ffmpegCallback); +const ffprobeProcessor = createWorkerProcessor(ffprobeCallback); const workers = [ - new Worker("transcode", transcodeFn, { connection, autorun: false }), - new Worker("package", packageFn, { connection, autorun: false }), - new Worker("ffmpeg", ffmpegFn, { connection, autorun: false }), + new Worker("transcode", transcodeProcessor, { + connection, + autorun: false, + }), + new Worker("package", packageProcessor, { + connection, + autorun: false, + }), + new Worker("ffmpeg", ffmpegProcessor, { + connection, + autorun: false, + }), + new Worker("ffprobe", ffprobeProcessor, { + connection, + autorun: false, + }), ]; async function gracefulShutdown() { diff --git a/packages/artisan/src/consumer/workers/package.ts b/packages/artisan/src/consumer/workers/package.ts index c859b980..dd45fd64 100644 --- a/packages/artisan/src/consumer/workers/package.ts +++ b/packages/artisan/src/consumer/workers/package.ts @@ -2,53 +2,47 @@ import { execa } from "execa"; import { lookup } from "mime-types"; import parseFilePath from "parse-filepath"; import { downloadFolder, uploadFolder } from "../s3"; -import { TmpDir } from "../tmp-dir"; import { getMeta } from "../meta"; import { getBinaryPath } from "../helpers"; -import type { Job } from "bullmq"; +import type { WorkerCallback } from "../lib/worker-processor"; import type { Stream } from "../../types"; const packagerBin = await getBinaryPath("packager"); export type PackageData = { - params: { - assetId: string; - segmentSize?: number; - name: string; - }; - metadata: { - tag?: string; - }; + assetId: string; + segmentSize?: number; + name: string; + tag?: string; }; export type PackageResult = { assetId: string; }; -async function runJob( - job: Job, - tmpDir: TmpDir, -): Promise { - const { params } = job.data; +export const packageCallback: WorkerCallback< + PackageData, + PackageResult +> = async ({ job, dir }) => { + const inDir = await dir.createTempDir(); - const inDir = await tmpDir.create(); - await downloadFolder(`transcode/${params.assetId}`, inDir); + await downloadFolder(`transcode/${job.data.assetId}`, inDir); job.log(`Synced folder in ${inDir}`); - const metaFile = await getMeta(inDir); + const meta = await getMeta(inDir); - job.log(`Got meta file: "${JSON.stringify(metaFile)}"`); + job.log(`Got meta: "${JSON.stringify(meta)}"`); // If we do not specify the segmentSize, grab it from the meta file. - const segmentSize = params.segmentSize ?? metaFile.segmentSize; + const segmentSize = job.data.segmentSize ?? meta.segmentSize; - const outDir = await tmpDir.create(); + const outDir = await dir.createTempDir(); const packagerParams: string[][] = []; - for (const key of Object.keys(metaFile.streams)) { - const stream = metaFile.streams[key]; + const entries = Object.entries(meta.streams); + entries.forEach(([key, stream]) => { const file = parseFilePath(key); if (stream.type === "video") { @@ -86,7 +80,7 @@ async function runJob( `language=${stream.language}`, ]); } - } + }); const packagerArgs = packagerParams.map((it) => `${it.join(",")}`); @@ -107,7 +101,7 @@ async function runJob( detached: false, }); - const s3Dir = `package/${params.assetId}/${params.name}`; + const s3Dir = `package/${job.data.assetId}/${job.data.name}`; job.log(`Uploading to ${s3Dir}`); await uploadFolder(outDir, s3Dir, { @@ -121,9 +115,9 @@ async function runJob( job.updateProgress(100); return { - assetId: params.assetId, + assetId: job.data.assetId, }; -} +}; function getGroupId( stream: @@ -154,12 +148,3 @@ function getName( return `${stream.language}`; } } - -export default async function (job: Job) { - const tmpDir = new TmpDir(); - try { - return await runJob(job, tmpDir); - } finally { - await tmpDir.deleteAll(); - } -} diff --git a/packages/artisan/src/consumer/workers/transcode.ts b/packages/artisan/src/consumer/workers/transcode.ts index 25b27046..c1d4e412 100644 --- a/packages/artisan/src/consumer/workers/transcode.ts +++ b/packages/artisan/src/consumer/workers/transcode.ts @@ -1,85 +1,331 @@ -import { addPackageJob } from "../../producer"; -import { getFakeJob } from "../helpers"; +import { WaitingChildrenError } from "bullmq"; +import { randomUUID } from "crypto"; +import { getLangCode } from "shared/lang"; +import { ffprobeQueue, ffmpegQueue } from "../../producer"; import { uploadJson } from "../s3"; -import { JOB_SKIPPED } from "./helpers"; +import { assert } from "../../assert"; +import { getDefaultAudioBitrate, getDefaultVideoBitrate } from "../../defaults"; +import type { Input, PartialInput, Stream, PartialStream } from "../../types"; +import type { Job } from "bullmq"; +import type { FfprobeResult } from "./ffprobe"; import type { FfmpegResult } from "./ffmpeg"; -import type { Stream } from "../../types"; import type { Meta } from "../meta"; -import type { Job } from "bullmq"; -import type { SkippableJobResult } from "./helpers"; +import type { WorkerCallback } from "../lib/worker-processor"; export type TranscodeData = { - params: { - assetId: string; - segmentSize: number; - packageAfter: boolean; - }; - metadata: { - tag?: string; - }; + assetId: string; + inputs: PartialInput[]; + streams: PartialStream[]; + segmentSize: number; + packageAfter: boolean; + tag?: string; + step?: Step; }; -export type TranscodeResult = SkippableJobResult<{ +export type TranscodeResult = { assetId: string; -}>; - -/** - * The transcode job relies on the underlying ffmpeg jobs. It waits until these - * are finished, and handles the meta.json file. - * @param job - * @returns - */ -export default async function ( - job: Job, -): Promise { - const { params, metadata } = job.data; - - const fakeJob = await getFakeJob(job); - - const childrenValues = await fakeJob.getChildrenValues(); - - const streams = Object.entries(childrenValues).reduce>( - (acc, [key, value]) => { - if (key.startsWith("bull:ffmpeg")) { - const result: FfmpegResult = value; - if (result === JOB_SKIPPED) { - // We skipped this job, bail out early. - return acc; - } - acc[result.name] = result.stream; +}; + +enum Step { + Initial, + Ffmpeg, + Meta, + Finish, +} + +export const transcodeCallback: WorkerCallback< + TranscodeData, + TranscodeResult +> = async ({ job, token }) => { + let step = job.data.step ?? Step.Initial; + while (step !== Step.Finish) { + switch (step) { + case Step.Initial: { + await handleStepInitial(job); + await job.updateData({ + ...job.data, + step: Step.Ffmpeg, + }); + step = Step.Ffmpeg; + break; } - return acc; + + case Step.Ffmpeg: { + await handleStepFfmpeg(job, token); + await job.updateData({ + ...job.data, + step: Step.Meta, + }); + step = Step.Meta; + break; + } + + case Step.Meta: { + await handleStepMeta(job, token); + await job.updateData({ + ...job.data, + step: Step.Finish, + }); + step = Step.Finish; + break; + } + } + } + + return { + assetId: job.data.assetId, + }; +}; + +async function handleStepInitial(job: Job) { + const inputs = job.data.inputs.filter( + (input) => input.type === "video" || input.type === "audio", + ); + + assert(job.id); + await ffprobeQueue.add( + "ffprobe", + { + inputs, + parentSortIndex: 0, + }, + { + jobId: `ffprobe_${randomUUID()}`, + failParentOnFailure: true, + parent: { + id: job.id, + queue: job.queueQualifiedName, + }, }, - {}, ); +} - if (!Object.keys(streams).length) { - job.log("Skip transcode, no streams found"); - return JOB_SKIPPED; - } +async function handleStepFfmpeg(job: Job, token?: string) { + await waitForChildren(job, token); + + const [probeResult] = await getChildren(job, "ffprobe"); + assert(probeResult); + + const inputs = job.data.inputs.map((partial) => + mergeInput(partial, probeResult), + ); + + let idx = 1; + job.data.streams.forEach((partial) => { + const match = matchInputForStream(partial, inputs); + if (!match) { + return; + } + + job.log( + `Match found for "${JSON.stringify(match.stream)}": ${JSON.stringify(match.input)}`, + ); + + assert(job.id); + ffmpegQueue.add( + getFfmpegJobName(match.stream), + { + input: match.input, + stream: match.stream, + segmentSize: job.data.segmentSize, + assetId: job.data.assetId, + parentSortIndex: idx, + }, + { + jobId: `ffmpeg_${randomUUID()}`, + failParentOnFailure: true, + parent: { + id: job.id, + queue: job.queueQualifiedName, + }, + }, + ); + + idx++; + }); +} + +async function handleStepMeta(job: Job, token?: string) { + await waitForChildren(job, token); + + const children = await getChildren(job, "ffmpeg"); + + const streams = children.reduce>((acc, child) => { + acc[child.name] = child.stream; + return acc; + }, {}); const meta: Meta = { version: 1, streams, - segmentSize: params.segmentSize, + segmentSize: job.data.segmentSize, }; await job.log(`Writing meta.json (${JSON.stringify(meta)})`); - await uploadJson(`transcode/${params.assetId}/meta.json`, meta); + await uploadJson(`transcode/${job.data.assetId}/meta.json`, meta); +} - if (params.packageAfter) { - await job.log("Will queue package job"); - await addPackageJob({ - assetId: params.assetId, - segmentSize: params.segmentSize, - tag: metadata.tag, - }); +async function waitForChildren(job: Job, token?: string) { + assert(token); + const shouldWait = await job.moveToWaitingChildren(token); + if (shouldWait) { + throw new WaitingChildrenError(); } +} - job.updateProgress(100); +async function getChildren(job: Job, name: string) { + const childrenValues = await job.getChildrenValues(); + const entries = Object.entries(childrenValues); - return { - assetId: params.assetId, - }; + return entries.reduce((acc, [key, value]) => { + if (!key.startsWith(`bull:${name}`)) { + return acc; + } + acc.push(value as T); + return acc; + }, []); +} + +function getFfmpegJobName(stream: Stream) { + const params: string[] = [stream.type]; + + if (stream.type === "video") { + params.push(stream.height.toString()); + } + if (stream.type === "audio" || stream.type === "text") { + params.push(stream.language); + } + + return `ffmpeg(${params.join(",")})`; +} + +function mergeInput(partial: PartialInput, probeResult: FfprobeResult): Input { + switch (partial.type) { + case "video": { + const info = probeResult.video[partial.path]; + assert(info); + + const height = partial.height ?? info.height; + assert(height, "Failed to retrieve height"); + + const framerate = partial.framerate ?? info.framerate; + assert(framerate, "Failed to retrieve framerate"); + + return { + type: "video", + path: partial.path, + height, + framerate, + }; + } + + case "audio": { + const info = probeResult.audio[partial.path]; + assert(info); + + const language = partial.language ?? getLangCode(info.language); + assert(language, "Failed to retrieve language"); + + // Assume when no channel metadata is found, we'll fallback to 2. + const channels = partial.channels ?? info.channels ?? 2; + + return { + type: "audio", + path: partial.path, + language, + channels, + }; + } + + case "text": + return partial; + } +} + +type MatchItem = { + type: T; + stream: Extract; + input: Extract; +}; + +type MatchResult = MatchItem<"video"> | MatchItem<"audio"> | MatchItem<"text">; + +function mergeStream(partial: PartialStream, input: Input): MatchResult | null { + if (partial.type === "video" && input.type === "video") { + const bitrate = + partial.bitrate ?? getDefaultVideoBitrate(partial.height, partial.codec); + + assert(bitrate); + + const stream: Extract = { + ...partial, + bitrate, + framerate: partial.framerate ?? input.framerate, + }; + return { type: "video", stream, input }; + } + if (partial.type === "audio" && input.type === "audio") { + const channels = partial.channels ?? input.channels; + const bitrate = + partial.bitrate ?? getDefaultAudioBitrate(channels, partial.codec); + + assert(bitrate); + + const stream: Extract = { + ...partial, + bitrate, + language: partial.language ?? input.language, + channels, + }; + return { type: "audio", stream, input }; + } + if (partial.type === "text" && input.type === "text") { + const stream: Extract = { + ...partial, + }; + return { type: "text", stream, input }; + } + return null; +} + +function matchInputForStream( + partial: PartialStream, + inputs: Input[], +): MatchResult | null { + const mergedStreams = inputs.map((input) => mergeStream(partial, input)); + + for (const mergedStream of mergedStreams) { + if (!mergedStream) { + continue; + } + + const { type, stream, input } = mergedStream; + + if (type === "video") { + if (stream.height > input.height) { + continue; + } + } + + if (type === "audio") { + if (stream.language !== input.language) { + continue; + } + + if (stream.channels > input.channels) { + continue; + } + } + + if (type === "text") { + if (stream.language !== input.language) { + continue; + } + } + + return mergedStream; + } + + return null; } diff --git a/packages/artisan/src/defaults.ts b/packages/artisan/src/defaults.ts new file mode 100644 index 00000000..67b33400 --- /dev/null +++ b/packages/artisan/src/defaults.ts @@ -0,0 +1,79 @@ +import type { AudioCodec, VideoCodec } from "shared/typebox"; + +// Check https://anton.lindstrom.io/gop-size-calculator/ +export const DEFAULT_SEGMENT_SIZE = 2.24; + +// We use the same default bitrates as shaka-streamer, thank you! +// https://github.com/shaka-project/shaka-streamer/blob/20c2704deacb402e39640408ac6157e94a5f78ba/streamer/bitrate_configuration.py + +const DEFAULT_AUDIO_BITRATE: { + [channels: number]: { + [codec in AudioCodec]: number; + }; +} = { + 2: { + aac: 128000, + ac3: 192000, + eac3: 96000, + }, + 6: { + aac: 256000, + ac3: 384000, + eac3: 192000, + }, +}; + +export function getDefaultAudioBitrate(channels: number, codec: AudioCodec) { + return DEFAULT_AUDIO_BITRATE[channels]?.[codec]; +} + +const DEFAULT_VIDEO_BITRATE: { + [height: number]: { + [codec in VideoCodec]: number; + }; +} = { + 144: { + h264: 108000, + hevc: 96000, + vp9: 96000, + }, + 240: { + h264: 242000, + hevc: 151000, + vp9: 151000, + }, + 360: { + h264: 400000, + hevc: 277000, + vp9: 277000, + }, + 480: { + h264: 1000000, + hevc: 512000, + vp9: 512000, + }, + 720: { + h264: 2000000, + hevc: 1000000, + vp9: 1000000, + }, + 1080: { + h264: 4000000, + hevc: 2000000, + vp9: 2000000, + }, + 1440: { + h264: 9000000, + hevc: 6000000, + vp9: 6000000, + }, + 2160: { + h264: 17000000, + hevc: 12000000, + vp9: 12000000, + }, +}; + +export function getDefaultVideoBitrate(height: number, codec: VideoCodec) { + return DEFAULT_VIDEO_BITRATE[height]?.[codec]; +} diff --git a/packages/artisan/src/producer/index.ts b/packages/artisan/src/producer/index.ts index b0fd9995..4a863470 100644 --- a/packages/artisan/src/producer/index.ts +++ b/packages/artisan/src/producer/index.ts @@ -1,11 +1,12 @@ -import { Queue, FlowProducer, Job } from "bullmq"; +import { Queue, FlowProducer } from "bullmq"; import { randomUUID } from "crypto"; import { connection } from "./env"; -import type { FlowChildJob } from "bullmq"; -import type { Input, Stream } from "../types"; +import { DEFAULT_SEGMENT_SIZE } from "../defaults"; +import type { PartialInput, PartialStream } from "../types"; import type { TranscodeData } from "../consumer/workers/transcode"; import type { PackageData } from "../consumer/workers/package"; import type { FfmpegData } from "../consumer/workers/ffmpeg"; +import type { FfprobeData } from "../consumer/workers/ffprobe"; export const flowProducer = new FlowProducer({ connection, @@ -19,7 +20,11 @@ const packageQueue = new Queue("package", { connection, }); -const ffmpegQueue = new Queue("ffmpeg", { +export const ffmpegQueue = new Queue("ffmpeg", { + connection, +}); + +export const ffprobeQueue = new Queue("ffprobe", { connection, }); @@ -27,12 +32,17 @@ const ffmpegQueue = new Queue("ffmpeg", { * Export all available queues so we can read them elsewhere, such as in api * where we can build job stats for each queue. */ -export const allQueus = [transcodeQueue, packageQueue, ffmpegQueue]; +export const allQueus = [ + transcodeQueue, + packageQueue, + ffmpegQueue, + ffprobeQueue, +]; type AddTranscodeJobData = { assetId?: string; - inputs: Input[]; - streams: Stream[]; + inputs: PartialInput[]; + streams: PartialStream[]; segmentSize?: number; packageAfter?: boolean; tag?: string; @@ -47,90 +57,31 @@ export async function addTranscodeJob({ assetId = randomUUID(), inputs, streams, - segmentSize = 4, + segmentSize = DEFAULT_SEGMENT_SIZE, packageAfter = false, tag, }: AddTranscodeJobData) { - const jobId = `transcode_${assetId}`; - - const pendingJob = await Job.fromId(transcodeQueue, jobId); - if (pendingJob) { - return pendingJob; - } - - let childJobIndex = 0; - const childJobs: FlowChildJob[] = []; - - for (const stream of streams) { - let input: Input | undefined; - - if (stream.type === "video") { - input = inputs.find((input) => input.type === "video"); - } - - if (stream.type === "audio") { - input = inputs.find( - (input) => input.type === "audio" && input.language === stream.language, - ); - } - - if (stream.type === "text") { - input = inputs.find( - (input) => input.type === "text" && input.language === stream.language, - ); - } - - if (input) { - const params: string[] = [stream.type]; - if (stream.type === "video") { - params.push(stream.height.toString()); - } - if (stream.type === "audio" || stream.type === "text") { - params.push(stream.language); - } - - childJobs.push({ - name: `ffmpeg(${params.join(",")})`, - data: { - params: { - input, - stream, - segmentSize, - assetId, - }, - metadata: { - parentSortKey: ++childJobIndex, - }, - } satisfies FfmpegData, - queueName: "ffmpeg", - opts: { - jobId: `ffmpeg_${randomUUID()}`, - failParentOnFailure: true, - }, - }); - } - } - - const { job } = await flowProducer.add({ - name: "transcode", - queueName: "transcode", - data: { - params: { - assetId, - segmentSize, - packageAfter, + return await transcodeQueue.add( + "transcode", + { + assetId, + inputs, + streams, + segmentSize, + packageAfter, + tag, + }, + { + jobId: `transcode_${assetId}`, + removeOnComplete: { + age: 3600 * 24 * 3, + count: 200, }, - metadata: { - tag, + removeOnFail: { + age: 3600 * 24 * 7, }, - } satisfies TranscodeData, - children: childJobs, - opts: { - jobId, }, - }); - - return job; + ); } type AddPackageJobData = { @@ -152,17 +103,20 @@ export async function addPackageJob({ return await packageQueue.add( "package", { - params: { - assetId, - segmentSize, - name, - }, - metadata: { - tag, - }, + assetId, + segmentSize, + name, + tag, }, { - jobId: `package_${randomUUID()}`, + jobId: `package_${assetId}_${name}`, + removeOnComplete: { + age: 3600 * 24 * 3, + count: 200, + }, + removeOnFail: { + age: 3600 * 24 * 7, + }, }, ); } diff --git a/packages/artisan/src/types.ts b/packages/artisan/src/types.ts index dda714fe..30463368 100644 --- a/packages/artisan/src/types.ts +++ b/packages/artisan/src/types.ts @@ -1,37 +1,44 @@ import type { LangCode, VideoCodec, AudioCodec } from "shared/typebox"; -export type Stream = +export type PartialStream = | { type: "video"; codec: VideoCodec; height: number; - bitrate: number; - framerate: number; + bitrate?: number; + framerate?: number; } | { type: "audio"; codec: AudioCodec; - bitrate: number; - language: LangCode; - channels: number; + bitrate?: number; + language?: LangCode; + channels?: number; } | { type: "text"; language: LangCode; }; -export type Input = +export type Stream = Required; + +export type PartialInput = | { type: "video"; path: string; + height?: number; + framerate?: number; } | { type: "audio"; path: string; - language: LangCode; + language?: LangCode; + channels?: number; } | { type: "text"; path: string; language: LangCode; }; + +export type Input = Required; diff --git a/packages/config/tsconfig.bun.json b/packages/config/tsconfig.bun.json index 75ad3c48..9685d616 100644 --- a/packages/config/tsconfig.bun.json +++ b/packages/config/tsconfig.bun.json @@ -18,6 +18,7 @@ "noFallthroughCasesInSwitch": true, "noUnusedLocals": true, "noUnusedParameters": true, - "noPropertyAccessFromIndexSignature": true + "noPropertyAccessFromIndexSignature": true, + "noUncheckedIndexedAccess": true } } diff --git a/packages/shared/package.json b/packages/shared/package.json index 5e50a5b5..26f6f454 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -6,7 +6,8 @@ "exports": { "./env": "./src/env.ts", "./scalar": "./src/scalar.ts", - "./typebox": "./src/typebox.ts" + "./typebox": "./src/typebox.ts", + "./lang": "./src/lang.ts" }, "scripts": { "lint": "eslint" diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts deleted file mode 100644 index 0f0d9cfa..00000000 --- a/packages/shared/src/index.ts +++ /dev/null @@ -1,5 +0,0 @@ -export * from "./env"; - -export * from "./typebox"; - -export * from "./scalar"; diff --git a/packages/shared/src/lang.ts b/packages/shared/src/lang.ts new file mode 100644 index 00000000..be21e73e --- /dev/null +++ b/packages/shared/src/lang.ts @@ -0,0 +1,11 @@ +import { by639_2T } from "iso-language-codes"; +import type { LangCode } from "./typebox"; + +export type { LangCode } from "./typebox"; + +export function getLangCode(value?: string) { + if (value && value in by639_2T) { + return value as LangCode; + } + return null; +} diff --git a/packages/stitcher/src/filters.ts b/packages/stitcher/src/filters.ts index ccf06751..efe6efa1 100644 --- a/packages/stitcher/src/filters.ts +++ b/packages/stitcher/src/filters.ts @@ -12,6 +12,10 @@ function getResolutionFilter( resolution: string, ): [number, (a: number, b: number) => boolean] { const [operator, value] = resolution.split(" "); + if (!value) { + throw new Error(`Failed to parse operator / value pair "${value}"`); + } + const height = parseInt(value, 10); const fn = diff --git a/packages/stitcher/src/parser/helpers.ts b/packages/stitcher/src/parser/helpers.ts index b416fc6f..93236e95 100644 --- a/packages/stitcher/src/parser/helpers.ts +++ b/packages/stitcher/src/parser/helpers.ts @@ -5,6 +5,9 @@ export function mapAttributes( const items = splitByCommaWithPreservingQuotes(param); items.forEach((item) => { const [key, value] = item.split("="); + if (key === undefined || value === undefined) { + return; + } callback(key, unquote(value)); }); } diff --git a/packages/stitcher/src/parser/lexical-parse.ts b/packages/stitcher/src/parser/lexical-parse.ts index b8249ec8..bbeb1382 100644 --- a/packages/stitcher/src/parser/lexical-parse.ts +++ b/packages/stitcher/src/parser/lexical-parse.ts @@ -93,6 +93,7 @@ function parseLine(line: string): Tag | null { case "EXTINF": { assert(param, "EXTINF: no param"); const chunks = param.split(","); + assert(chunks[0], "EXTINF: no duration in param"); return [ name, { @@ -114,6 +115,8 @@ function parseLine(line: string): Tag | null { case "RESOLUTION": { const chunks = value.split("x"); + assert(chunks[0], "EXT-X-STREAM-INF DURATION: no width"); + assert(chunks[1], "EXT-X-STREAM-INF DURATION: no height"); attrs.resolution = { width: parseFloat(chunks[0]), height: parseFloat(chunks[1]), diff --git a/packages/stitcher/src/parser/parse.ts b/packages/stitcher/src/parser/parse.ts index 15bdcb2a..0f0ad0c4 100644 --- a/packages/stitcher/src/parser/parse.ts +++ b/packages/stitcher/src/parser/parse.ts @@ -58,7 +58,7 @@ function formatMediaPlaylist(tags: Tag[]): MediaPlaylist { let segmentStart = index; const segmentEnd = index + 1; for (let i = index; i > 0; i--) { - if (tags[i][0] === "LITERAL") { + if (tags[i]?.[0] === "LITERAL") { segmentStart = i + 1; break; } @@ -188,7 +188,11 @@ function nextLiteral(tags: Tag[], index: number) { if (!tags[index + 1]) { throw new Error("Expecting next tag to be found"); } - const [name, value] = tags[index + 1]; + const tag = tags[index + 1]; + if (!tag) { + throw new Error(`Expected valid tag on ${index + 1}`); + } + const [name, value] = tag; if (name !== "LITERAL") { throw new Error("Expecting next tag to be a literal"); } diff --git a/packages/stitcher/src/parser/stringify.ts b/packages/stitcher/src/parser/stringify.ts index 766af3fe..6cda9779 100644 --- a/packages/stitcher/src/parser/stringify.ts +++ b/packages/stitcher/src/parser/stringify.ts @@ -1,3 +1,4 @@ +import { assert } from "../assert"; import { Lines } from "./lines"; import type { Rendition, @@ -41,6 +42,7 @@ function buildVariant(lines: Lines, variant: Variant) { } if (variant.audio.length) { + assert(variant.audio[0]); attrs.push(`AUDIO="${variant.audio[0].groupId}"`); for (const rendition of variant.audio) { buildRendition(lines, rendition); @@ -48,6 +50,7 @@ function buildVariant(lines: Lines, variant: Variant) { } if (variant.subtitles.length) { + assert(variant.subtitles[0]); attrs.push(`SUBTITLES="${variant.subtitles[0].groupId}"`); for (const rendition of variant.subtitles) { buildRendition(lines, rendition); diff --git a/packages/stitcher/src/playlist.ts b/packages/stitcher/src/playlist.ts index 440f7694..5ac8929f 100644 --- a/packages/stitcher/src/playlist.ts +++ b/packages/stitcher/src/playlist.ts @@ -36,7 +36,7 @@ export async function formatMediaPlaylist(sessionId: string, path: string) { const { mediaType, media } = await presentation.getMedia(path); - if (mediaType === "video" && media.endlist) { + if (mediaType === "video" && media.endlist && media.segments[0]) { // When we have an endlist, the playlist is static. We can check whether we need // to add dateRanges. media.segments[0].programDateTime = getStaticPDT(session); diff --git a/packages/stitcher/src/presentation.ts b/packages/stitcher/src/presentation.ts index 4838100d..e2aab86a 100644 --- a/packages/stitcher/src/presentation.ts +++ b/packages/stitcher/src/presentation.ts @@ -90,6 +90,8 @@ export class Presentation { async getDuration() { const master = await this.getMaster(); + + assert(master.variants[0], "No variants in master playlist"); const media = await this.getMedia_(master.variants[0].uri); return media.segments.reduce((acc, segment) => { diff --git a/packages/stitcher/src/url.ts b/packages/stitcher/src/url.ts index f482996f..4bb683a7 100644 --- a/packages/stitcher/src/url.ts +++ b/packages/stitcher/src/url.ts @@ -11,7 +11,7 @@ export function getMasterUrl(uri: string) { } const potentialUuid = uri.split("@", 2)[0]; - if (uuidRegex.test(potentialUuid)) { + if (potentialUuid && uuidRegex.test(potentialUuid)) { uri = `${ASSET_PROTOCOL}//${uri}`; } diff --git a/packages/stitcher/src/vast.ts b/packages/stitcher/src/vast.ts index 6686c08d..680b69c9 100644 --- a/packages/stitcher/src/vast.ts +++ b/packages/stitcher/src/vast.ts @@ -58,7 +58,6 @@ function scheduleForPackage(adMedia: AdMedia) { tag: "ad", assetId: adMedia.assetId, packageAfter: true, - segmentSize: 4, inputs: [ { path: adMedia.fileUrl, @@ -75,22 +74,16 @@ function scheduleForPackage(adMedia: AdMedia) { type: "video", codec: "h264", height: 720, - bitrate: 4000000, - framerate: 24, }, { type: "video", codec: "h264", height: 480, - bitrate: 1500000, - framerate: 24, }, { type: "audio", codec: "aac", - bitrate: 128000, language: "eng", - channels: 2, // TODO: Make optional and derive channels from input }, ], });