Skip to content

Commit

Permalink
Merge branch 'Chigala-refactor/lower-max-retries-for-specific-tasks'
Browse files Browse the repository at this point in the history
  • Loading branch information
ericallam committed Aug 28, 2023
2 parents a107824 + 773a6e2 commit 03721cb
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 14 deletions.
2 changes: 2 additions & 0 deletions apps/webapp/app/models/jobRunExecution.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export type EnqueueRunExecutionV2Options = {
runAt?: Date;
resumeTaskId?: string;
isRetry?: boolean;
skipRetrying?: boolean;
};

export async function enqueueRunExecutionV2(
Expand All @@ -47,6 +48,7 @@ export async function enqueueRunExecutionV2(
tx,
runAt: options.runAt,
jobKey: `job_run:${run.id}`,
maxAttempts: options.skipRetrying ? 1 : undefined,
}
);
}
Expand Down
14 changes: 12 additions & 2 deletions apps/webapp/app/platform/zodWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,13 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
): Promise<GraphileJob> {
const task = this.#tasks[identifier];

const optionsWithoutTx = omit(options ?? {}, ["tx"]);
const optionsWithoutTx = removeUndefinedKeys(omit(options ?? {}, ["tx"]));
const taskWithoutJobKey = omit(task, ["jobKey"]);

// Make sure options passed in to enqueue take precedence over task options
const spec = {
...optionsWithoutTx,
...taskWithoutJobKey,
...optionsWithoutTx,
};

if (typeof task.queueName === "function") {
Expand Down Expand Up @@ -437,3 +438,12 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
logger.debug(`[worker][${this.#name}] ${message}`, args);
}
}

function removeUndefinedKeys<T extends object>(obj: T): T {
for (let key in obj) {
if (Object.prototype.hasOwnProperty.call(obj, key) && obj[key] === undefined) {
delete obj[key];
}
}
return obj;
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ActionArgs, LoaderArgs, json } from "@remix-run/server-runtime";
import { RuntimeEnvironmentType } from "@trigger.dev/database";
import { z } from "zod";
import { PrismaClient, prisma } from "~/db.server";
import { logger } from "~/services/logger.server";
Expand Down Expand Up @@ -99,6 +100,9 @@ export class TriggerEndpointIndexHookService {
slug: endpointSlug,
},
},
include: {
environment: true,
},
});

if (!endpoint) {
Expand All @@ -122,6 +126,8 @@ export class TriggerEndpointIndexHookService {
},
{
runAt: new Date(Date.now() + 5000),
maxAttempts:
endpoint.environment.type === RuntimeEnvironmentType.DEVELOPMENT ? 1 : undefined,
}
);
}
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/services/endpointApi.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ export class EndpointApiError extends Error {
}

export class EndpointApi {
constructor(private apiKey: string, private url: string) {}
constructor(
private apiKey: string,
private url: string
) {}

async ping(endpointId: string): Promise<PongResponse> {
const response = await safeFetch(this.url, {
Expand Down
10 changes: 9 additions & 1 deletion apps/webapp/app/services/endpoints/createEndpoint.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { AuthenticatedEnvironment } from "../apiAuth.server";
import { EndpointApi } from "../endpointApi.server";
import { workerQueue } from "../worker.server";
import { env } from "~/env.server";
import { RuntimeEnvironmentType } from "@trigger.dev/database";

const indexingHookIdentifier = customAlphabet("0123456789abcdefghijklmnopqrstuvxyz", 10);

Expand Down Expand Up @@ -51,6 +52,9 @@ export class CreateEndpointService {
slug: id,
},
},
include: {
environment: true,
},
create: {
environment: {
connect: {
Expand Down Expand Up @@ -83,7 +87,11 @@ export class CreateEndpointService {
id: endpoint.id,
source: "INTERNAL",
},
{ tx }
{
tx,
maxAttempts:
endpoint.environment.type === RuntimeEnvironmentType.DEVELOPMENT ? 1 : undefined,
}
);

return endpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { AuthenticatedEnvironment } from "../apiAuth.server";
import { workerQueue } from "../worker.server";
import { CreateEndpointError } from "./createEndpoint.server";
import { EndpointApi } from "../endpointApi.server";
import { RuntimeEnvironmentType } from "@trigger.dev/database";

const indexingHookIdentifier = customAlphabet("0123456789abcdefghijklmnopqrstuvxyz", 10);

Expand Down Expand Up @@ -35,6 +36,9 @@ export class ValidateCreateEndpointService {
slug: validationResult.endpointId,
},
},
include: {
environment: true,
},
create: {
environment: {
connect: {
Expand Down Expand Up @@ -67,7 +71,11 @@ export class ValidateCreateEndpointService {
id: endpoint.id,
source: "INTERNAL",
},
{ tx }
{
tx,
maxAttempts:
endpoint.environment.type === RuntimeEnvironmentType.DEVELOPMENT ? 1 : undefined,
}
);

return endpoint;
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/services/events/ingestSendEvent.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import { logger } from "../logger.server";
export class IngestSendEvent {
#prismaClient: PrismaClientOrTransaction;

constructor(prismaClient: PrismaClientOrTransaction = prisma, private deliverEvents = true) {
constructor(
prismaClient: PrismaClientOrTransaction = prisma,
private deliverEvents = true
) {
this.#prismaClient = prismaClient;
}

Expand Down
8 changes: 7 additions & 1 deletion apps/webapp/app/services/runs/continueRun.server.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { RuntimeEnvironmentType } from "@trigger.dev/database";
import { $transaction, Prisma, PrismaClient, prisma } from "~/db.server";
import { enqueueRunExecutionV2 } from "~/models/jobRunExecution.server";

Expand All @@ -16,6 +17,9 @@ export class ContinueRunService {
async (tx) => {
const run = await tx.jobRun.findUniqueOrThrow({
where: { id: runId },
include: {
environment: true,
},
});

if (!RESUMABLE_STATUSES.includes(run.status)) {
Expand All @@ -35,7 +39,9 @@ export class ContinueRunService {
},
});

await enqueueRunExecutionV2(run, tx);
await enqueueRunExecutionV2(run, tx, {
skipRetrying: run.environment.type === RuntimeEnvironmentType.DEVELOPMENT,
});
},
{ timeout: 10000 }
);
Expand Down
12 changes: 9 additions & 3 deletions apps/webapp/app/services/runs/performRunExecutionV2.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
RunJobSuccess,
RunSourceContextSchema,
} from "@trigger.dev/core";
import type { Task } from "@trigger.dev/database";
import { RuntimeEnvironmentType, type Task } from "@trigger.dev/database";
import { generateErrorMessage } from "zod-error";
import { eventRecordToApiJson } from "~/api.server";
import { $transaction, PrismaClient, PrismaClientOrTransaction, prisma } from "~/db.server";
Expand Down Expand Up @@ -135,7 +135,9 @@ export class PerformRunExecutionV2Service {
},
});

await enqueueRunExecutionV2(run, tx);
await enqueueRunExecutionV2(run, tx, {
skipRetrying: run.environment.type === RuntimeEnvironmentType.DEVELOPMENT,
});
});
}
}
Expand Down Expand Up @@ -337,6 +339,7 @@ export class PerformRunExecutionV2Service {
runAt: data.task.delayUntil ?? undefined,
resumeTaskId: data.task.id,
isRetry,
skipRetrying: run.environment.type === RuntimeEnvironmentType.DEVELOPMENT,
});
}
});
Expand Down Expand Up @@ -409,6 +412,7 @@ export class PerformRunExecutionV2Service {
runAt: data.retryAt,
resumeTaskId: data.task.id,
isRetry,
skipRetrying: run.environment.type === RuntimeEnvironmentType.DEVELOPMENT,
});
});
}
Expand Down Expand Up @@ -464,7 +468,9 @@ export class PerformRunExecutionV2Service {
},
});

await enqueueRunExecutionV2(run, tx);
await enqueueRunExecutionV2(run, tx, {
skipRetrying: run.environment.type === RuntimeEnvironmentType.DEVELOPMENT,
});

break;
}
Expand Down
12 changes: 10 additions & 2 deletions apps/webapp/app/services/runs/startRun.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import type { ConnectionType, Integration, IntegrationConnection } from "@trigger.dev/database";
import {
RuntimeEnvironmentType,
type ConnectionType,
type Integration,
type IntegrationConnection,
} from "@trigger.dev/database";
import type { PrismaClient, PrismaClientOrTransaction } from "~/db.server";
import { prisma } from "~/db.server";
import { enqueueRunExecutionV2 } from "~/models/jobRunExecution.server";
Expand Down Expand Up @@ -83,7 +88,9 @@ export class StartRunService {

const updatedRun = await updateRun();

await enqueueRunExecutionV2(updatedRun, this.#prismaClient);
await enqueueRunExecutionV2(updatedRun, this.#prismaClient, {
skipRetrying: run.environment.type === RuntimeEnvironmentType.DEVELOPMENT,
});
}

async #handleMissingConnections(id: string, runConnectionsByKey: RunConnectionsByKey) {
Expand Down Expand Up @@ -140,6 +147,7 @@ async function findRun(tx: PrismaClientOrTransaction, id: string) {
where: { id },
include: {
queue: true,
environment: true,
version: {
include: {
integrations: {
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/services/sources/handleHttpSource.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { PrismaClient } from "~/db.server";
import { prisma } from "~/db.server";
import { workerQueue } from "../worker.server";
import { requestUrl } from "~/utils/requestUrl.server";
import { RuntimeEnvironmentType } from "@trigger.dev/database";

export class HandleHttpSourceService {
#prismaClient: PrismaClient;
Expand Down Expand Up @@ -55,6 +56,8 @@ export class HandleHttpSourceService {
{
queueName: `endpoint-${triggerSource.endpointId}`,
tx,
maxAttempts:
triggerSource.environment.type === RuntimeEnvironmentType.DEVELOPMENT ? 1 : undefined,
}
);
});
Expand Down
9 changes: 7 additions & 2 deletions apps/webapp/app/services/tasks/performTaskOperation.server.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { env } from "process";
import { Run } from "~/presenters/RunPresenter.server";
import {
FetchOperationSchema,
FetchRequestInit,
Expand All @@ -6,7 +8,7 @@ import {
RedactString,
calculateRetryAt,
} from "@trigger.dev/core";
import type { Task } from "@trigger.dev/database";
import { RuntimeEnvironmentType, type Task } from "@trigger.dev/database";
import { $transaction, PrismaClient, PrismaClientOrTransaction, prisma } from "~/db.server";
import { enqueueRunExecutionV2 } from "~/models/jobRunExecution.server";
import { formatUnknownError } from "~/utils/formatErrors.server";
Expand Down Expand Up @@ -244,7 +246,9 @@ export class PerformTaskOperationService {
}

async #resumeRunExecution(task: NonNullable<FoundTask>, prisma: PrismaClientOrTransaction) {
await enqueueRunExecutionV2(task.run, prisma);
await enqueueRunExecutionV2(task.run, prisma, {
skipRetrying: task.run.environment.type === RuntimeEnvironmentType.DEVELOPMENT,
});
}
}

Expand Down Expand Up @@ -281,6 +285,7 @@ async function findTask(prisma: PrismaClient, id: string) {
attempts: true,
run: {
include: {
environment: true,
queue: true,
},
},
Expand Down

0 comments on commit 03721cb

Please sign in to comment.