Skip to content

Commit

Permalink
chore: Added tagging
Browse files Browse the repository at this point in the history
  • Loading branch information
matvp91 committed Aug 23, 2024
1 parent bf88dc9 commit b4dce92
Show file tree
Hide file tree
Showing 15 changed files with 164 additions and 74 deletions.
3 changes: 2 additions & 1 deletion packages/api/src/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ async function formatJobDto(job: Job): Promise<JobDto> {
inputData: JSON.stringify(job.data),
outputData: job.returnvalue ? JSON.stringify(job.returnvalue) : null,
failedReason: job.failedReason ?? null,
tag: extract(job.data, "metadata.tag", null),
};
}

Expand Down Expand Up @@ -75,7 +76,7 @@ async function formatJobNodeDto(node: JobNode): Promise<JobNodeDto> {
const children = node.children ?? [];

const findParentSortKey = (obj: unknown) =>
extract(obj, "data.parentSortKey", 0);
extract(obj, "data.metadata.parentSortKey", 0);
children.sort((a, b) => findParentSortKey(a.job) - findParentSortKey(b.job));

return {
Expand Down
1 change: 1 addition & 0 deletions packages/api/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export type JobDto = {
inputData: string;
outputData: string | null;
failedReason: string | null;
tag: string | null;
};

export type JobNodeDto = {
Expand Down
66 changes: 39 additions & 27 deletions packages/artisan/src/consumer/workers/ffmpeg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ import type { Stream, Input } from "../../schemas.js";
import type { FfmpegCommand } from "fluent-ffmpeg";

export type FfmpegData = {
parentSortKey?: number;
input: Input;
stream: Stream;
segmentSize: number;
assetId: string;
params: {
input: Input;
stream: Stream;
segmentSize: number;
assetId: string;
};
metadata: {
parentSortKey: number;
};
};

export type FfmpegResult = {
Expand All @@ -20,10 +24,10 @@ export type FfmpegResult = {
};

export default async function (job: Job<FfmpegData, FfmpegResult>) {
const { input, stream, segmentSize, assetId } = job.data;
const { params } = job.data;

const dir = dirSync();
let inputFile = parseFilePath(input.path);
let inputFile = parseFilePath(params.input.path);

if (inputFile.dir.startsWith("s3://")) {
const s3SourcePath = inputFile.path.replace("s3://", "");
Expand All @@ -39,56 +43,56 @@ export default async function (job: Job<FfmpegData, FfmpegResult>) {
let name: string | undefined;
let ffmpegCmd: FfmpegCommand | undefined;

if (stream.type === "video") {
const keyFrameInterval = segmentSize * stream.framerate;
if (params.stream.type === "video") {
const keyFrameInterval = params.segmentSize * params.stream.framerate;

let codec: string;
switch (stream.codec) {
switch (params.stream.codec) {
case "h264":
codec = "libx264";
break;
default:
codec = stream.codec;
codec = params.stream.codec;
break;
}

name = `video_${stream.height}_${stream.bitrate}_${stream.codec}.m4v`;
name = `video_${params.stream.height}_${params.stream.bitrate}_${params.stream.codec}.m4v`;

ffmpegCmd = ffmpeg(inputFile.path)
.noAudio()
.format("mp4")
.size(`?x${stream.height}`)
.size(`?x${params.stream.height}`)
.aspectRatio("16:9")
.autoPad(true)
.videoCodec(codec)
.videoBitrate(stream.bitrate)
.videoBitrate(params.stream.bitrate)
.outputOptions([
`-frag_duration ${segmentSize * 1e6}`,
`-frag_duration ${params.segmentSize * 1e6}`,
"-movflags +frag_keyframe",
`-r ${stream.framerate}`,
`-r ${params.stream.framerate}`,
`-keyint_min ${keyFrameInterval}`,
`-g ${keyFrameInterval}`,
])
.output(`${dir.name}/${name}`);
}

if (stream.type === "audio") {
name = `audio_${stream.language}_${stream.bitrate}.m4a`;
if (params.stream.type === "audio") {
name = `audio_${params.stream.language}_${params.stream.bitrate}.m4a`;

ffmpegCmd = ffmpeg(inputFile.path)
.noVideo()
.format("mp4")
.audioCodec(stream.codec)
.audioBitrate(stream.bitrate)
.audioCodec(params.stream.codec)
.audioBitrate(params.stream.bitrate)
.outputOptions([
`-metadata language=${stream.language}`,
`-frag_duration ${segmentSize * 1e6}`,
`-metadata language=${params.stream.language}`,
`-frag_duration ${params.segmentSize * 1e6}`,
])
.output(`${dir.name}/${name}`);
}

if (stream.type === "text") {
name = `text_${stream.language}.vtt`;
if (params.stream.type === "text") {
name = `text_${params.stream.language}.vtt`;

ffmpegCmd = ffmpeg(inputFile.path).output(`${dir.name}/${name}`);
}
Expand Down Expand Up @@ -116,9 +120,17 @@ export default async function (job: Job<FfmpegData, FfmpegResult>) {
.run();
});

job.log(`Uploading ${dir.name}/${name} to transcode/${assetId}/${name}`);
job.log(
`Uploading ${dir.name}/${name} to transcode/${params.assetId}/${name}`,
);

await uploadFile(`transcode/${assetId}/${name}`, `${dir.name}/${name}`);
await uploadFile(
`transcode/${params.assetId}/${name}`,
`${dir.name}/${name}`,
);

return { name, stream };
return {
name,
stream: params.stream,
};
}
30 changes: 18 additions & 12 deletions packages/artisan/src/consumer/workers/package.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ import type { Code } from "iso-language-codes";
const metaSchema = z.record(z.string(), streamSchema);

export type PackageData = {
assetId: string;
tag: string;
params: {
assetId: string;
};
metadata: {
tag: string;
};
};

export type PackageResult = {
Expand All @@ -28,23 +32,25 @@ function formatLanguage(code: Code) {
}

export default async function (job: Job<PackageData, PackageResult>) {
const { params } = job.data;

const dir = dirSync();
await downloadFolder(dir.name, `transcode/${job.data.assetId}`);
await downloadFolder(dir.name, `transcode/${params.assetId}`);

const metaFile = await metaSchema.parseAsync(
JSON.parse(await readFile(`${dir.name}/meta.json`, "utf8")),
);

const outDir = dirSync();

const params: string[][] = [];
const packagerParams: string[][] = [];

for (const key of Object.keys(metaFile)) {
const stream = metaFile[key];
const file = parseFilePath(key);

if (stream.type === "video") {
params.push([
packagerParams.push([
`in=${dir.name}/${key}`,
"stream=video",
`init_segment=${file.name}/init.mp4`,
Expand All @@ -55,7 +61,7 @@ export default async function (job: Job<PackageData, PackageResult>) {
}

if (stream.type === "audio") {
params.push([
packagerParams.push([
`in=${dir.name}/${key}`,
"stream=audio",
`init_segment=${file.name}/init.mp4`,
Expand All @@ -67,7 +73,7 @@ export default async function (job: Job<PackageData, PackageResult>) {
}

if (stream.type === "text") {
params.push([
packagerParams.push([
`in=${dir.name}/${key}`,
"stream=text",
`segment_template=${file.name}/$Number$.vtt`,
Expand All @@ -78,7 +84,7 @@ export default async function (job: Job<PackageData, PackageResult>) {
}
}

const packagerArgs = params.map((it) => `${it.join(",")}`);
const packagerArgs = packagerParams.map((it) => `${it.join(",")}`);

packagerArgs.push(
"--segment_duration",
Expand All @@ -102,7 +108,7 @@ export default async function (job: Job<PackageData, PackageResult>) {

await once(packagerProcess, "close");

await uploadFolder(outDir.name, `package/${job.data.assetId}/hls`, {
await uploadFolder(outDir.name, `package/${params.assetId}/hls`, {
del: true,
commandInput: (input) => ({
ContentType: lookup(input.Key) || "binary/octet-stream",
Expand All @@ -114,14 +120,14 @@ export default async function (job: Job<PackageData, PackageResult>) {
// becomes available on CDN.
// This way we ensure we have all the segments on S3 before we make the manifest available.
await copyFile(
`package/${job.data.assetId}/hls/master_tmp.m3u8`,
`package/${job.data.assetId}/hls/master.m3u8`,
`package/${params.assetId}/hls/master_tmp.m3u8`,
`package/${params.assetId}/hls/master.m3u8`,
"public-read",
);

job.updateProgress(100);

return {
assetId: job.data.assetId,
assetId: params.assetId,
};
}
22 changes: 14 additions & 8 deletions packages/artisan/src/consumer/workers/transcode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,22 @@ import type { FfmpegResult } from "./ffmpeg.js";
import type { Job } from "bullmq";

export type TranscodeData = {
assetId: string;
package: boolean;
tag: string;
params: {
assetId: string;
package: boolean;
};
metadata: {
tag: string;
};
};

export type TranscodeResult = {
assetId: string;
};

export default async function (job: Job<TranscodeData, TranscodeResult>) {
const { params, metadata } = job.data;

const fakeJob = await getFakeJob<TranscodeData>(job);

const childrenValues = await fakeJob.getChildrenValues();
Expand All @@ -34,21 +40,21 @@ export default async function (job: Job<TranscodeData, TranscodeResult>) {
await job.log(`Writing meta.json (${JSON.stringify(meta)})`);

await uploadJsonFile(
`transcode/${job.data.assetId}/meta.json`,
`transcode/${params.assetId}/meta.json`,
JSON.stringify(meta, null, 2),
);

if (job.data.package) {
if (params.package) {
await job.log("Will queue package job");
await addPackageJob({
assetId: job.data.assetId,
tag: job.data.tag,
assetId: params.assetId,
tag: metadata.tag,
});
}

job.updateProgress(100);

return {
assetId: job.data.assetId,
assetId: params.assetId,
};
}
32 changes: 22 additions & 10 deletions packages/artisan/src/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ export async function addTranscodeJob(data: AddTranscodeJobData) {
childJobs.push({
name: `ffmpeg(${params.join(",")})`,
data: {
parentSortKey: ++childJobIndex,
input,
stream,
segmentSize: data.segmentSize,
assetId: data.assetId,
params: {
input,
stream,
segmentSize: data.segmentSize,
assetId: data.assetId,
},
metadata: {
parentSortKey: ++childJobIndex,
},
} satisfies FfmpegData,
queueName: "ffmpeg",
opts: {
Expand All @@ -98,9 +102,13 @@ export async function addTranscodeJob(data: AddTranscodeJobData) {
name: "transcode",
queueName: "transcode",
data: {
assetId: data.assetId,
package: data.package,
tag: data.tag,
params: {
assetId: data.assetId,
package: data.package,
},
metadata: {
tag: data.tag,
},
} satisfies TranscodeData,
children: childJobs,
opts: {
Expand All @@ -120,8 +128,12 @@ export async function addPackageJob(data: AddPackageJobData) {
return await packageQueue.add(
"package",
{
assetId: data.assetId,
tag: data.tag,
params: {
assetId: data.assetId,
},
metadata: {
tag: data.tag,
},
} satisfies PackageData,
{
jobId: `package_${data.assetId}`,
Expand Down
1 change: 1 addition & 0 deletions packages/dashboard/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"tailwind-merge": "^2.4.0",
"tailwindcss-animate": "^1.0.7",
"timeago.js": "4.0.0-beta.3",
"uniqolor": "^1.1.1",
"zod": "^3.23.8"
},
"devDependencies": {
Expand Down
Loading

0 comments on commit b4dce92

Please sign in to comment.