From b4dce922f7b5190400d1c4b2c814ad5987f066a5 Mon Sep 17 00:00:00 2001 From: Matthias Van Parijs Date: Fri, 23 Aug 2024 09:20:34 +0200 Subject: [PATCH] chore: Added tagging --- packages/api/src/jobs.ts | 3 +- packages/api/src/types.ts | 1 + .../artisan/src/consumer/workers/ffmpeg.ts | 66 +++++++++++-------- .../artisan/src/consumer/workers/package.ts | 30 +++++---- .../artisan/src/consumer/workers/transcode.ts | 22 ++++--- packages/artisan/src/producer.ts | 32 ++++++--- packages/dashboard/package.json | 1 + packages/dashboard/src/components/JobTag.tsx | 41 ++++++++++++ .../dashboard/src/components/JobsList.tsx | 8 ++- packages/stitcher/src/contract.ts | 2 +- packages/stitcher/src/index.ts | 9 +-- packages/stitcher/src/playlist.ts | 10 ++- packages/stitcher/src/session.ts | 4 +- packages/stitcher/src/types.ts | 2 +- pnpm-lock.yaml | 7 ++ 15 files changed, 164 insertions(+), 74 deletions(-) create mode 100644 packages/dashboard/src/components/JobTag.tsx diff --git a/packages/api/src/jobs.ts b/packages/api/src/jobs.ts index da91db1f..ab43a3bc 100644 --- a/packages/api/src/jobs.ts +++ b/packages/api/src/jobs.ts @@ -47,6 +47,7 @@ async function formatJobDto(job: Job): Promise { inputData: JSON.stringify(job.data), outputData: job.returnvalue ? JSON.stringify(job.returnvalue) : null, failedReason: job.failedReason ?? null, + tag: extract(job.data, "metadata.tag", null), }; } @@ -75,7 +76,7 @@ async function formatJobNodeDto(node: JobNode): Promise { const children = node.children ?? []; const findParentSortKey = (obj: unknown) => - extract(obj, "data.parentSortKey", 0); + extract(obj, "data.metadata.parentSortKey", 0); children.sort((a, b) => findParentSortKey(a.job) - findParentSortKey(b.job)); return { diff --git a/packages/api/src/types.ts b/packages/api/src/types.ts index ad0de55d..8129f8fa 100644 --- a/packages/api/src/types.ts +++ b/packages/api/src/types.ts @@ -11,6 +11,7 @@ export type JobDto = { inputData: string; outputData: string | null; failedReason: string | null; + tag: string | null; }; export type JobNodeDto = { diff --git a/packages/artisan/src/consumer/workers/ffmpeg.ts b/packages/artisan/src/consumer/workers/ffmpeg.ts index 03cf80fe..65c1d393 100644 --- a/packages/artisan/src/consumer/workers/ffmpeg.ts +++ b/packages/artisan/src/consumer/workers/ffmpeg.ts @@ -7,11 +7,15 @@ import type { Stream, Input } from "../../schemas.js"; import type { FfmpegCommand } from "fluent-ffmpeg"; export type FfmpegData = { - parentSortKey?: number; - input: Input; - stream: Stream; - segmentSize: number; - assetId: string; + params: { + input: Input; + stream: Stream; + segmentSize: number; + assetId: string; + }; + metadata: { + parentSortKey: number; + }; }; export type FfmpegResult = { @@ -20,10 +24,10 @@ export type FfmpegResult = { }; export default async function (job: Job) { - const { input, stream, segmentSize, assetId } = job.data; + const { params } = job.data; const dir = dirSync(); - let inputFile = parseFilePath(input.path); + let inputFile = parseFilePath(params.input.path); if (inputFile.dir.startsWith("s3://")) { const s3SourcePath = inputFile.path.replace("s3://", ""); @@ -39,56 +43,56 @@ export default async function (job: Job) { let name: string | undefined; let ffmpegCmd: FfmpegCommand | undefined; - if (stream.type === "video") { - const keyFrameInterval = segmentSize * stream.framerate; + if (params.stream.type === "video") { + const keyFrameInterval = params.segmentSize * params.stream.framerate; let codec: string; - switch (stream.codec) { + switch (params.stream.codec) { case "h264": codec = "libx264"; break; default: - codec = stream.codec; + codec = params.stream.codec; break; } - name = `video_${stream.height}_${stream.bitrate}_${stream.codec}.m4v`; + name = `video_${params.stream.height}_${params.stream.bitrate}_${params.stream.codec}.m4v`; ffmpegCmd = ffmpeg(inputFile.path) .noAudio() .format("mp4") - .size(`?x${stream.height}`) + .size(`?x${params.stream.height}`) .aspectRatio("16:9") .autoPad(true) .videoCodec(codec) - .videoBitrate(stream.bitrate) + .videoBitrate(params.stream.bitrate) .outputOptions([ - `-frag_duration ${segmentSize * 1e6}`, + `-frag_duration ${params.segmentSize * 1e6}`, "-movflags +frag_keyframe", - `-r ${stream.framerate}`, + `-r ${params.stream.framerate}`, `-keyint_min ${keyFrameInterval}`, `-g ${keyFrameInterval}`, ]) .output(`${dir.name}/${name}`); } - if (stream.type === "audio") { - name = `audio_${stream.language}_${stream.bitrate}.m4a`; + if (params.stream.type === "audio") { + name = `audio_${params.stream.language}_${params.stream.bitrate}.m4a`; ffmpegCmd = ffmpeg(inputFile.path) .noVideo() .format("mp4") - .audioCodec(stream.codec) - .audioBitrate(stream.bitrate) + .audioCodec(params.stream.codec) + .audioBitrate(params.stream.bitrate) .outputOptions([ - `-metadata language=${stream.language}`, - `-frag_duration ${segmentSize * 1e6}`, + `-metadata language=${params.stream.language}`, + `-frag_duration ${params.segmentSize * 1e6}`, ]) .output(`${dir.name}/${name}`); } - if (stream.type === "text") { - name = `text_${stream.language}.vtt`; + if (params.stream.type === "text") { + name = `text_${params.stream.language}.vtt`; ffmpegCmd = ffmpeg(inputFile.path).output(`${dir.name}/${name}`); } @@ -116,9 +120,17 @@ export default async function (job: Job) { .run(); }); - job.log(`Uploading ${dir.name}/${name} to transcode/${assetId}/${name}`); + job.log( + `Uploading ${dir.name}/${name} to transcode/${params.assetId}/${name}`, + ); - await uploadFile(`transcode/${assetId}/${name}`, `${dir.name}/${name}`); + await uploadFile( + `transcode/${params.assetId}/${name}`, + `${dir.name}/${name}`, + ); - return { name, stream }; + return { + name, + stream: params.stream, + }; } diff --git a/packages/artisan/src/consumer/workers/package.ts b/packages/artisan/src/consumer/workers/package.ts index 958705dd..dc6f0198 100644 --- a/packages/artisan/src/consumer/workers/package.ts +++ b/packages/artisan/src/consumer/workers/package.ts @@ -15,8 +15,12 @@ import type { Code } from "iso-language-codes"; const metaSchema = z.record(z.string(), streamSchema); export type PackageData = { - assetId: string; - tag: string; + params: { + assetId: string; + }; + metadata: { + tag: string; + }; }; export type PackageResult = { @@ -28,8 +32,10 @@ function formatLanguage(code: Code) { } export default async function (job: Job) { + const { params } = job.data; + const dir = dirSync(); - await downloadFolder(dir.name, `transcode/${job.data.assetId}`); + await downloadFolder(dir.name, `transcode/${params.assetId}`); const metaFile = await metaSchema.parseAsync( JSON.parse(await readFile(`${dir.name}/meta.json`, "utf8")), @@ -37,14 +43,14 @@ export default async function (job: Job) { const outDir = dirSync(); - const params: string[][] = []; + const packagerParams: string[][] = []; for (const key of Object.keys(metaFile)) { const stream = metaFile[key]; const file = parseFilePath(key); if (stream.type === "video") { - params.push([ + packagerParams.push([ `in=${dir.name}/${key}`, "stream=video", `init_segment=${file.name}/init.mp4`, @@ -55,7 +61,7 @@ export default async function (job: Job) { } if (stream.type === "audio") { - params.push([ + packagerParams.push([ `in=${dir.name}/${key}`, "stream=audio", `init_segment=${file.name}/init.mp4`, @@ -67,7 +73,7 @@ export default async function (job: Job) { } if (stream.type === "text") { - params.push([ + packagerParams.push([ `in=${dir.name}/${key}`, "stream=text", `segment_template=${file.name}/$Number$.vtt`, @@ -78,7 +84,7 @@ export default async function (job: Job) { } } - const packagerArgs = params.map((it) => `${it.join(",")}`); + const packagerArgs = packagerParams.map((it) => `${it.join(",")}`); packagerArgs.push( "--segment_duration", @@ -102,7 +108,7 @@ export default async function (job: Job) { await once(packagerProcess, "close"); - await uploadFolder(outDir.name, `package/${job.data.assetId}/hls`, { + await uploadFolder(outDir.name, `package/${params.assetId}/hls`, { del: true, commandInput: (input) => ({ ContentType: lookup(input.Key) || "binary/octet-stream", @@ -114,14 +120,14 @@ export default async function (job: Job) { // becomes available on CDN. // This way we ensure we have all the segments on S3 before we make the manifest available. await copyFile( - `package/${job.data.assetId}/hls/master_tmp.m3u8`, - `package/${job.data.assetId}/hls/master.m3u8`, + `package/${params.assetId}/hls/master_tmp.m3u8`, + `package/${params.assetId}/hls/master.m3u8`, "public-read", ); job.updateProgress(100); return { - assetId: job.data.assetId, + assetId: params.assetId, }; } diff --git a/packages/artisan/src/consumer/workers/transcode.ts b/packages/artisan/src/consumer/workers/transcode.ts index 9737c4a7..60421992 100644 --- a/packages/artisan/src/consumer/workers/transcode.ts +++ b/packages/artisan/src/consumer/workers/transcode.ts @@ -6,9 +6,13 @@ import type { FfmpegResult } from "./ffmpeg.js"; import type { Job } from "bullmq"; export type TranscodeData = { - assetId: string; - package: boolean; - tag: string; + params: { + assetId: string; + package: boolean; + }; + metadata: { + tag: string; + }; }; export type TranscodeResult = { @@ -16,6 +20,8 @@ export type TranscodeResult = { }; export default async function (job: Job) { + const { params, metadata } = job.data; + const fakeJob = await getFakeJob(job); const childrenValues = await fakeJob.getChildrenValues(); @@ -34,21 +40,21 @@ export default async function (job: Job) { await job.log(`Writing meta.json (${JSON.stringify(meta)})`); await uploadJsonFile( - `transcode/${job.data.assetId}/meta.json`, + `transcode/${params.assetId}/meta.json`, JSON.stringify(meta, null, 2), ); - if (job.data.package) { + if (params.package) { await job.log("Will queue package job"); await addPackageJob({ - assetId: job.data.assetId, - tag: job.data.tag, + assetId: params.assetId, + tag: metadata.tag, }); } job.updateProgress(100); return { - assetId: job.data.assetId, + assetId: params.assetId, }; } diff --git a/packages/artisan/src/producer.ts b/packages/artisan/src/producer.ts index 2307c45b..4c149aaf 100644 --- a/packages/artisan/src/producer.ts +++ b/packages/artisan/src/producer.ts @@ -80,11 +80,15 @@ export async function addTranscodeJob(data: AddTranscodeJobData) { childJobs.push({ name: `ffmpeg(${params.join(",")})`, data: { - parentSortKey: ++childJobIndex, - input, - stream, - segmentSize: data.segmentSize, - assetId: data.assetId, + params: { + input, + stream, + segmentSize: data.segmentSize, + assetId: data.assetId, + }, + metadata: { + parentSortKey: ++childJobIndex, + }, } satisfies FfmpegData, queueName: "ffmpeg", opts: { @@ -98,9 +102,13 @@ export async function addTranscodeJob(data: AddTranscodeJobData) { name: "transcode", queueName: "transcode", data: { - assetId: data.assetId, - package: data.package, - tag: data.tag, + params: { + assetId: data.assetId, + package: data.package, + }, + metadata: { + tag: data.tag, + }, } satisfies TranscodeData, children: childJobs, opts: { @@ -120,8 +128,12 @@ export async function addPackageJob(data: AddPackageJobData) { return await packageQueue.add( "package", { - assetId: data.assetId, - tag: data.tag, + params: { + assetId: data.assetId, + }, + metadata: { + tag: data.tag, + }, } satisfies PackageData, { jobId: `package_${data.assetId}`, diff --git a/packages/dashboard/package.json b/packages/dashboard/package.json index f3c8bad3..4c6356c7 100644 --- a/packages/dashboard/package.json +++ b/packages/dashboard/package.json @@ -38,6 +38,7 @@ "tailwind-merge": "^2.4.0", "tailwindcss-animate": "^1.0.7", "timeago.js": "4.0.0-beta.3", + "uniqolor": "^1.1.1", "zod": "^3.23.8" }, "devDependencies": { diff --git a/packages/dashboard/src/components/JobTag.tsx b/packages/dashboard/src/components/JobTag.tsx new file mode 100644 index 00000000..695cdc2c --- /dev/null +++ b/packages/dashboard/src/components/JobTag.tsx @@ -0,0 +1,41 @@ +import uniqolor from "uniqolor"; +import type { JobDto } from "@mixwave/api/client"; + +type JobTagProps = { + job: JobDto; +}; + +export function JobTag({ job }: JobTagProps) { + if (!job.tag) { + return null; + } + + if (job.tag === "default") { + return null; + } + + const { color } = uniqolor(job.tag, {}); + + return ( + + {job.tag} + + ); +} + +function hexToRGB(hex: string, alpha: number) { + return ( + "rgba(" + + parseInt(hex.slice(1, 3), 16) + + ", " + + parseInt(hex.slice(3, 5), 16) + + ", " + + parseInt(hex.slice(5, 7), 16) + + ", " + + alpha + + ")" + ); +} diff --git a/packages/dashboard/src/components/JobsList.tsx b/packages/dashboard/src/components/JobsList.tsx index e7e49e5f..af89f5ac 100644 --- a/packages/dashboard/src/components/JobsList.tsx +++ b/packages/dashboard/src/components/JobsList.tsx @@ -1,6 +1,7 @@ import { Link } from "react-router-dom"; import { JobState } from "./JobState"; import { getShortId, getTimeAgo } from "@/lib/helpers"; +import { JobTag } from "./JobTag"; import type { JobDto } from "@/lib/api"; type JobsListProps = { @@ -24,8 +25,11 @@ export function JobsList({ jobs }: JobsListProps) { {job.name} -
- {getTimeAgo(job.createdOn)} +
+ +
+ {getTimeAgo(job.createdOn)} +
diff --git a/packages/stitcher/src/contract.ts b/packages/stitcher/src/contract.ts index 3720c23f..a3320013 100644 --- a/packages/stitcher/src/contract.ts +++ b/packages/stitcher/src/contract.ts @@ -4,7 +4,7 @@ import * as z from "zod"; const c = initContract(); export const postSessionBodySchema = z.object({ - url: z.string(), + assetId: z.string(), vmapUrl: z.string().optional(), ads: z .array( diff --git a/packages/stitcher/src/index.ts b/packages/stitcher/src/index.ts index 1c72ab2b..05c708a6 100644 --- a/packages/stitcher/src/index.ts +++ b/packages/stitcher/src/index.ts @@ -10,7 +10,6 @@ import { formatMediaPlaylist, formatInterstitialsJson, } from "./playlist.js"; -import parseFilepath from "parse-filepath"; async function buildServer() { const app = Fastify(); @@ -33,7 +32,7 @@ async function buildServer() { }, getMasterPlaylist: async ({ params, reply }) => { const session = await getSession(params.sessionId); - const response = await formatMasterPlaylist(session.url); + const response = await formatMasterPlaylist(session); reply.type("application/x-mpegURL"); @@ -44,12 +43,8 @@ async function buildServer() { }, getMediaPlaylist: async ({ params, reply }) => { const session = await getSession(params.sessionId); - const filePath = parseFilepath(session.url); - const response = await formatMediaPlaylist( - `${filePath.dir}/${params.path}/playlist.m3u8`, - session, - ); + const response = await formatMediaPlaylist(session, params.path); reply.type("application/x-mpegURL"); diff --git a/packages/stitcher/src/playlist.ts b/packages/stitcher/src/playlist.ts index 6757396f..cf9f32a2 100644 --- a/packages/stitcher/src/playlist.ts +++ b/packages/stitcher/src/playlist.ts @@ -3,7 +3,7 @@ import parseFilepath from "parse-filepath"; import { Interstitial } from "../extern/hls-parser/types.js"; import { env } from "./env.js"; import { MasterPlaylist, MediaPlaylist } from "../extern/hls-parser/types.js"; -import { Session } from "./types.js"; +import type { Session } from "./types.js"; type InterstitialAsset = { URI: string; @@ -16,13 +16,17 @@ async function fetchPlaylist(url: string) { return parse(text) as T; } -export async function formatMasterPlaylist(url: string) { +export async function formatMasterPlaylist(session: Session) { + const url = `${env.S3_PUBLIC_URL}/package/${session.assetId}/hls/master.m3u8`; + const master = await fetchPlaylist(url); return stringify(master); } -export async function formatMediaPlaylist(url: string, session: Session) { +export async function formatMediaPlaylist(session: Session, path: string) { + const url = `${env.S3_PUBLIC_URL}/package/${session.assetId}/hls/${path}/playlist.m3u8`; + const media = await fetchPlaylist(url); const filePath = parseFilepath(url); diff --git a/packages/stitcher/src/session.ts b/packages/stitcher/src/session.ts index c0402aa1..9a2249f4 100644 --- a/packages/stitcher/src/session.ts +++ b/packages/stitcher/src/session.ts @@ -8,7 +8,7 @@ const REDIS_PREFIX = `stitcher:session`; const key = (sessionId: string) => `${REDIS_PREFIX}:${sessionId}`; export async function createSession(data: { - url: string; + assetId: string; vmapUrl?: string; ads?: Ad[]; }) { @@ -24,7 +24,7 @@ export async function createSession(data: { const session = { id: sessionId, - url: data.url, + assetId: data.assetId, ads, } satisfies Session; diff --git a/packages/stitcher/src/types.ts b/packages/stitcher/src/types.ts index 4bd264ce..7343ddd1 100644 --- a/packages/stitcher/src/types.ts +++ b/packages/stitcher/src/types.ts @@ -1,6 +1,6 @@ export type Session = { id: string; - url: string; + assetId: string; ads: Ad[]; }; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6da48281..0b683f0b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -223,6 +223,9 @@ importers: timeago.js: specifier: 4.0.0-beta.3 version: 4.0.0-beta.3 + uniqolor: + specifier: ^1.1.1 + version: 1.1.1 zod: specifier: ^3.23.8 version: 3.23.8 @@ -10246,6 +10249,10 @@ packages: vfile: 6.0.2 dev: false + /uniqolor@1.1.1: + resolution: {integrity: sha512-HUwezlXCwm5bzsEXW7AP7ybezH13uWENRgYT+3dOdhJPvpYucSqvIGckMiLn+Uy2j0NVf3fPp43uZ4aun3t4Ww==} + dev: false + /unique-string@3.0.0: resolution: {integrity: sha512-VGXBUVwxKMBUznyffQweQABPRRW1vHZAbadFZud4pLFAqRGvv/96vafgjWFqzourzr8YonlQiPgH0YCJfawoGQ==} engines: {node: '>=12'}