Skip to content

Commit

Permalink
feat: FFprobe input to extract defaults. (#79)
Browse files Browse the repository at this point in the history
* Added ffprobe worker

* Added ffprobe as first job for transcode

* Added ffprobe as child job of transcode

* Remove logs on complete

* Changed log stream to input

* No longer allow non strict indexes
  • Loading branch information
matvp91 authored Oct 23, 2024
1 parent a823560 commit 686a64d
Show file tree
Hide file tree
Showing 33 changed files with 743 additions and 390 deletions.
16 changes: 10 additions & 6 deletions packages/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,16 @@ const app = new Elysia()
description:
"The source path, starting with http(s):// or s3://",
}),
height: t.Optional(t.Number()),
}),
t.Object({
type: t.Literal("audio"),
path: t.String({
description:
"The source path, starting with http(s):// or s3://",
}),
language: LangCodeSchema,
language: t.Optional(LangCodeSchema),
channels: t.Optional(t.Number()),
}),
t.Object({
type: t.Literal("text"),
Expand All @@ -97,15 +99,17 @@ const app = new Elysia()
type: t.Literal("video"),
codec: VideoCodecSchema,
height: t.Number(),
bitrate: t.Number({ description: "Bitrate in bps" }),
framerate: t.Number({ description: "Frames per second" }),
bitrate: t.Optional(t.Number({ description: "Bitrate in bps" })),
framerate: t.Optional(
t.Number({ description: "Frames per second" }),
),
}),
t.Object({
type: t.Literal("audio"),
codec: AudioCodecSchema,
bitrate: t.Number({ description: "Bitrate in bps" }),
language: LangCodeSchema,
channels: t.Number(),
bitrate: t.Optional(t.Number({ description: "Bitrate in bps" })),
language: t.Optional(LangCodeSchema),
channels: t.Optional(t.Number()),
}),
t.Object({
type: t.Literal("text"),
Expand Down
26 changes: 8 additions & 18 deletions packages/api/src/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ function findQueueByName(name: string): Queue {

function formatIdPair(id: string): [Queue, string] {
const queueName = id.split("_", 1)[0];
if (!queueName) {
throw new Error("Missing queueName as prefix when formatting id pair");
}
return [findQueueByName(queueName), id];
}

Expand Down Expand Up @@ -95,16 +98,16 @@ async function formatJobNode(node: JobNode): Promise<Job> {
progress = job.progress;
}

const state = mapJobState(await job.getState(), job.returnvalue);
const state = mapJobState(await job.getState());

const failedReason = state === "failed" ? job.failedReason : undefined;

const findParentSortKey = (job: BullMQJob): number => {
const value = job.data?.metadata?.parentSortKey;
const findParentSortIndex = (job: BullMQJob): number => {
const value = job.data?.parentSortIndex;
return typeof value === "number" ? value : 0;
};
(children ?? []).sort(
(a, b) => findParentSortKey(a.job) - findParentSortKey(b.job),
(a, b) => findParentSortIndex(a.job) - findParentSortIndex(b.job),
);

const jobChildren = await Promise.all((children ?? []).map(formatJobNode));
Expand Down Expand Up @@ -150,20 +153,7 @@ async function formatJobNode(node: JobNode): Promise<Job> {
};
}

// Keep these in sync with ocnsumer/workers/helpers.ts in artisan,
// we can treat the result as a string literal to indicate non standard
// job states such as "skipped".
type JobReturnValueStatus = "__JOB_SKIPPED__";

function mapJobState(
jobState: JobState | "unknown",
maybeReturnValue?: JobReturnValueStatus,
): Job["state"] {
// We pass maybeReturnValue as "any" from the input, it's not typed. But we
// can check whether it is a defined return value for non standard job states.
if (maybeReturnValue === "__JOB_SKIPPED__") {
return "skipped";
}
function mapJobState(jobState: JobState | "unknown"): Job["state"] {
if (jobState === "active" || jobState === "waiting-children") {
return "running";
}
Expand Down
1 change: 0 additions & 1 deletion packages/api/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ export const JobSchema = t.Recursive(
t.Literal("running"),
t.Literal("failed"),
t.Literal("completed"),
t.Literal("skipped"),
]),
progress: t.Number(),
createdOn: t.Number(),
Expand Down
4 changes: 0 additions & 4 deletions packages/app/src/components/JobState.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import Loader from "lucide-react/icons/loader";
import CircleDotDashed from "lucide-react/icons/circle-dot-dashed";
import Check from "lucide-react/icons/check";
import X from "lucide-react/icons/x";
import CircleOff from "lucide-react/icons/circle-off";
import { cn } from "@/lib/utils";
import type { Job } from "@/api";

Expand All @@ -16,9 +15,6 @@ export function JobState({ state }: { state: Job["state"] }) {
if (state === "running") {
return createCircle("bg-blue-200 text-blue-800", Loader, "animate-spin");
}
if (state === "skipped") {
return createCircle("bg-gray-200 text-gray-800", CircleOff);
}
return createCircle("bg-violet-200 text-violet-800", CircleDotDashed);
}

Expand Down
11 changes: 0 additions & 11 deletions packages/app/src/components/JobsStats.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ export function JobsStats({ jobs, filter, onChange }: JobsStatsProps) {
let completed = 0;
let failed = 0;
let running = 0;
let skipped = 0;

for (const job of jobs) {
if (job.state === "completed") {
Expand All @@ -25,9 +24,6 @@ export function JobsStats({ jobs, filter, onChange }: JobsStatsProps) {
if (job.state === "failed") {
failed += 1;
}
if (job.state === "skipped") {
skipped += 1;
}
}

const filterJobState = (state?: Job["state"]) => {
Expand Down Expand Up @@ -61,13 +57,6 @@ export function JobsStats({ jobs, filter, onChange }: JobsStatsProps) {
active={filter.state === "running"}
tooltip="Running"
/>
<JobStatsTile
value={skipped}
className="bg-gray-400"
onClick={() => filterJobState("skipped")}
active={filter.state === "skipped"}
tooltip="Skipped"
/>
</div>
</TooltipProvider>
);
Expand Down
4 changes: 3 additions & 1 deletion packages/app/src/pages/JobsPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ export function JobsPage() {
{filteredJobs.length ? (
<JobsList jobs={filteredJobs} />
) : (
<p className="text-center">No jobs found...</p>
<p className="text-center py-16 text-muted-foreground">
Nothing here but tumbleweeds... and they're not clickable.
</p>
)}
</div>
</div>
Expand Down
8 changes: 8 additions & 0 deletions packages/artisan/src/assert.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export function assert<T>(
value: T,
message: string = "value is null",
): asserts value is NonNullable<T> {
if (value === null || value === undefined) {
throw Error(message);
}
}
47 changes: 24 additions & 23 deletions packages/artisan/src/consumer/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,7 @@
import { Job, Queue } from "bullmq";
import { connection } from "./env";

/**
* Gets the full job. This is particularly handy when you need
* children values from child jobs.
* @param job Full job
* @returns
*/
export async function getFakeJob<T>(job: Job) {
if (!job.id) {
throw new Error("Missing job id");
}

const queue = new Queue(job.queueName, { connection });
const fakeJob = await Job.fromId<T>(queue, job.id);

if (!fakeJob) {
throw new Error("Failed to fetch fake job");
}

return fakeJob;
}
import parseFilepath from "parse-filepath";
import { downloadFile } from "./s3";
import { Dir } from "./lib/dir";
import type { PartialInput } from "../types";

export async function getBinaryPath(name: string) {
const direct = `${process.cwd()}/bin/${name}`;
Expand All @@ -40,3 +21,23 @@ export async function getBinaryPath(name: string) {
`Failed to get bin dep "${name}", run scripts/bin-deps.sh to install binary dependencies.`,
);
}

export async function getInputPath(input: PartialInput, dir: Dir | string) {
const filePath = parseFilepath(input.path);

// If the input is on S3, download the file locally.
if (filePath.dir.startsWith("s3://")) {
const inDir = dir instanceof Dir ? await dir.createTempDir() : dir;
await downloadFile(inDir, filePath.path.replace("s3://", ""));
return parseFilepath(`${inDir}/${filePath.basename}`);
}

if (
filePath.dir.startsWith("http://") ||
filePath.dir.startsWith("https://")
) {
return filePath;
}

throw new Error("Failed to resolve input path");
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import * as os from "node:os";
/**
* Manager for temporary directories on file system.
*/
export class TmpDir {
export class Dir {
private dirs_ = new Set<string>();

/**
* Create a new temporary directory.
* @returns
*/
async create() {
async createTempDir() {
const dir = await fs.mkdtemp(
path.join(os.tmpdir(), `superstreamer-${crypto.randomUUID()}`),
);
Expand Down
20 changes: 20 additions & 0 deletions packages/artisan/src/consumer/lib/worker-processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Dir } from "./dir";
import type { Job } from "bullmq";

export type WorkerCallback<T, R> = (params: {
job: Job<T, R>;
token?: string | undefined;
dir: Dir;
}) => Promise<R>;

export function createWorkerProcessor<T, R>(callback: WorkerCallback<T, R>) {
const dir = new Dir();

return async (job: Job<T, R>, token?: string) => {
try {
return await callback({ job, token, dir });
} finally {
await dir.deleteAll();
}
};
}
9 changes: 8 additions & 1 deletion packages/artisan/src/consumer/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,21 @@ export async function downloadFolder(key: string, path: string) {
* @param key S3 key
*/
export async function downloadFile(path: string, key: string) {
const name = `${path}/${basename(key)}`;

if (await Bun.file(name).exists()) {
// If the file already exists, we have nothing to do.
return;
}

const response = await client.send(
new GetObjectCommand({
Bucket: env.S3_BUCKET,
Key: key,
}),
);

await writeFile(`${path}/${basename(key)}`, response.Body as Readable);
await writeFile(name, response.Body as Readable);
}

/**
Expand Down
Loading

0 comments on commit 686a64d

Please sign in to comment.