diff --git a/apps/webapp/app/models/jobRunExecution.server.ts b/apps/webapp/app/models/jobRunExecution.server.ts index 41076ae644..1b40d6526a 100644 --- a/apps/webapp/app/models/jobRunExecution.server.ts +++ b/apps/webapp/app/models/jobRunExecution.server.ts @@ -27,6 +27,7 @@ export type EnqueueRunExecutionV2Options = { runAt?: Date; resumeTaskId?: string; isRetry?: boolean; + skipRetrying?: boolean; }; export async function enqueueRunExecutionV2( @@ -47,6 +48,7 @@ export async function enqueueRunExecutionV2( tx, runAt: options.runAt, jobKey: `job_run:${run.id}`, + maxAttempts: options.skipRetrying ? 1 : undefined, } ); } diff --git a/apps/webapp/app/platform/zodWorker.server.ts b/apps/webapp/app/platform/zodWorker.server.ts index 327ab6c38c..ea3cafe4b9 100644 --- a/apps/webapp/app/platform/zodWorker.server.ts +++ b/apps/webapp/app/platform/zodWorker.server.ts @@ -195,12 +195,13 @@ export class ZodWorker { ): Promise { const task = this.#tasks[identifier]; - const optionsWithoutTx = omit(options ?? {}, ["tx"]); + const optionsWithoutTx = removeUndefinedKeys(omit(options ?? {}, ["tx"])); const taskWithoutJobKey = omit(task, ["jobKey"]); + // Make sure options passed in to enqueue take precedence over task options const spec = { - ...optionsWithoutTx, ...taskWithoutJobKey, + ...optionsWithoutTx, }; if (typeof task.queueName === "function") { @@ -437,3 +438,12 @@ export class ZodWorker { logger.debug(`[worker][${this.#name}] ${message}`, args); } } + +function removeUndefinedKeys(obj: T): T { + for (let key in obj) { + if (Object.prototype.hasOwnProperty.call(obj, key) && obj[key] === undefined) { + delete obj[key]; + } + } + return obj; +} diff --git a/apps/webapp/app/routes/api.v1.endpoints.$environmentId.$endpointSlug.index.$indexHookIdentifier.ts b/apps/webapp/app/routes/api.v1.endpoints.$environmentId.$endpointSlug.index.$indexHookIdentifier.ts index ce54de2329..5429b9b21e 100644 --- a/apps/webapp/app/routes/api.v1.endpoints.$environmentId.$endpointSlug.index.$indexHookIdentifier.ts +++ b/apps/webapp/app/routes/api.v1.endpoints.$environmentId.$endpointSlug.index.$indexHookIdentifier.ts @@ -1,4 +1,5 @@ import { ActionArgs, LoaderArgs, json } from "@remix-run/server-runtime"; +import { RuntimeEnvironmentType } from "@trigger.dev/database"; import { z } from "zod"; import { PrismaClient, prisma } from "~/db.server"; import { logger } from "~/services/logger.server"; @@ -99,6 +100,9 @@ export class TriggerEndpointIndexHookService { slug: endpointSlug, }, }, + include: { + environment: true, + }, }); if (!endpoint) { @@ -122,6 +126,8 @@ export class TriggerEndpointIndexHookService { }, { runAt: new Date(Date.now() + 5000), + maxAttempts: + endpoint.environment.type === RuntimeEnvironmentType.DEVELOPMENT ? 1 : undefined, } ); } diff --git a/apps/webapp/app/services/endpointApi.server.ts b/apps/webapp/app/services/endpointApi.server.ts index e7cba31e3d..6804b63948 100644 --- a/apps/webapp/app/services/endpointApi.server.ts +++ b/apps/webapp/app/services/endpointApi.server.ts @@ -28,7 +28,10 @@ export class EndpointApiError extends Error { } export class EndpointApi { - constructor(private apiKey: string, private url: string) {} + constructor( + private apiKey: string, + private url: string + ) {} async ping(endpointId: string): Promise { const response = await safeFetch(this.url, { diff --git a/apps/webapp/app/services/endpoints/createEndpoint.server.ts b/apps/webapp/app/services/endpoints/createEndpoint.server.ts index 3b2fbe9788..bd1839222f 100644 --- a/apps/webapp/app/services/endpoints/createEndpoint.server.ts +++ b/apps/webapp/app/services/endpoints/createEndpoint.server.ts @@ -4,6 +4,7 @@ import { AuthenticatedEnvironment } from "../apiAuth.server"; import { EndpointApi } from "../endpointApi.server"; import { workerQueue } from "../worker.server"; import { env } from "~/env.server"; +import { RuntimeEnvironmentType } from "@trigger.dev/database"; const indexingHookIdentifier = customAlphabet("0123456789abcdefghijklmnopqrstuvxyz", 10); @@ -51,6 +52,9 @@ export class CreateEndpointService { slug: id, }, }, + include: { + environment: true, + }, create: { environment: { connect: { @@ -83,7 +87,11 @@ export class CreateEndpointService { id: endpoint.id, source: "INTERNAL", }, - { tx } + { + tx, + maxAttempts: + endpoint.environment.type === RuntimeEnvironmentType.DEVELOPMENT ? 1 : undefined, + } ); return endpoint; diff --git a/apps/webapp/app/services/endpoints/validateCreateEndpoint.server.ts b/apps/webapp/app/services/endpoints/validateCreateEndpoint.server.ts index fca8e4d6ea..d578a26d9d 100644 --- a/apps/webapp/app/services/endpoints/validateCreateEndpoint.server.ts +++ b/apps/webapp/app/services/endpoints/validateCreateEndpoint.server.ts @@ -5,6 +5,7 @@ import { AuthenticatedEnvironment } from "../apiAuth.server"; import { workerQueue } from "../worker.server"; import { CreateEndpointError } from "./createEndpoint.server"; import { EndpointApi } from "../endpointApi.server"; +import { RuntimeEnvironmentType } from "@trigger.dev/database"; const indexingHookIdentifier = customAlphabet("0123456789abcdefghijklmnopqrstuvxyz", 10); @@ -35,6 +36,9 @@ export class ValidateCreateEndpointService { slug: validationResult.endpointId, }, }, + include: { + environment: true, + }, create: { environment: { connect: { @@ -67,7 +71,11 @@ export class ValidateCreateEndpointService { id: endpoint.id, source: "INTERNAL", }, - { tx } + { + tx, + maxAttempts: + endpoint.environment.type === RuntimeEnvironmentType.DEVELOPMENT ? 1 : undefined, + } ); return endpoint; diff --git a/apps/webapp/app/services/events/ingestSendEvent.server.ts b/apps/webapp/app/services/events/ingestSendEvent.server.ts index 41fffed53d..a4130bfdbc 100644 --- a/apps/webapp/app/services/events/ingestSendEvent.server.ts +++ b/apps/webapp/app/services/events/ingestSendEvent.server.ts @@ -7,7 +7,10 @@ import { logger } from "../logger.server"; export class IngestSendEvent { #prismaClient: PrismaClientOrTransaction; - constructor(prismaClient: PrismaClientOrTransaction = prisma, private deliverEvents = true) { + constructor( + prismaClient: PrismaClientOrTransaction = prisma, + private deliverEvents = true + ) { this.#prismaClient = prismaClient; } diff --git a/apps/webapp/app/services/runs/continueRun.server.ts b/apps/webapp/app/services/runs/continueRun.server.ts index 97f0ae4f30..00dad4ad22 100644 --- a/apps/webapp/app/services/runs/continueRun.server.ts +++ b/apps/webapp/app/services/runs/continueRun.server.ts @@ -1,3 +1,4 @@ +import { RuntimeEnvironmentType } from "@trigger.dev/database"; import { $transaction, Prisma, PrismaClient, prisma } from "~/db.server"; import { enqueueRunExecutionV2 } from "~/models/jobRunExecution.server"; @@ -16,6 +17,9 @@ export class ContinueRunService { async (tx) => { const run = await tx.jobRun.findUniqueOrThrow({ where: { id: runId }, + include: { + environment: true, + }, }); if (!RESUMABLE_STATUSES.includes(run.status)) { @@ -35,7 +39,9 @@ export class ContinueRunService { }, }); - await enqueueRunExecutionV2(run, tx); + await enqueueRunExecutionV2(run, tx, { + skipRetrying: run.environment.type === RuntimeEnvironmentType.DEVELOPMENT, + }); }, { timeout: 10000 } ); diff --git a/apps/webapp/app/services/runs/performRunExecutionV2.server.ts b/apps/webapp/app/services/runs/performRunExecutionV2.server.ts index 95d2bce6bb..01c8f9f4d2 100644 --- a/apps/webapp/app/services/runs/performRunExecutionV2.server.ts +++ b/apps/webapp/app/services/runs/performRunExecutionV2.server.ts @@ -6,7 +6,7 @@ import { RunJobSuccess, RunSourceContextSchema, } from "@trigger.dev/core"; -import type { Task } from "@trigger.dev/database"; +import { RuntimeEnvironmentType, type Task } from "@trigger.dev/database"; import { generateErrorMessage } from "zod-error"; import { eventRecordToApiJson } from "~/api.server"; import { $transaction, PrismaClient, PrismaClientOrTransaction, prisma } from "~/db.server"; @@ -135,7 +135,9 @@ export class PerformRunExecutionV2Service { }, }); - await enqueueRunExecutionV2(run, tx); + await enqueueRunExecutionV2(run, tx, { + skipRetrying: run.environment.type === RuntimeEnvironmentType.DEVELOPMENT, + }); }); } } @@ -337,6 +339,7 @@ export class PerformRunExecutionV2Service { runAt: data.task.delayUntil ?? undefined, resumeTaskId: data.task.id, isRetry, + skipRetrying: run.environment.type === RuntimeEnvironmentType.DEVELOPMENT, }); } }); @@ -409,6 +412,7 @@ export class PerformRunExecutionV2Service { runAt: data.retryAt, resumeTaskId: data.task.id, isRetry, + skipRetrying: run.environment.type === RuntimeEnvironmentType.DEVELOPMENT, }); }); } @@ -464,7 +468,9 @@ export class PerformRunExecutionV2Service { }, }); - await enqueueRunExecutionV2(run, tx); + await enqueueRunExecutionV2(run, tx, { + skipRetrying: run.environment.type === RuntimeEnvironmentType.DEVELOPMENT, + }); break; } diff --git a/apps/webapp/app/services/runs/startRun.server.ts b/apps/webapp/app/services/runs/startRun.server.ts index b9b647534b..818e310c54 100644 --- a/apps/webapp/app/services/runs/startRun.server.ts +++ b/apps/webapp/app/services/runs/startRun.server.ts @@ -1,4 +1,9 @@ -import type { ConnectionType, Integration, IntegrationConnection } from "@trigger.dev/database"; +import { + RuntimeEnvironmentType, + type ConnectionType, + type Integration, + type IntegrationConnection, +} from "@trigger.dev/database"; import type { PrismaClient, PrismaClientOrTransaction } from "~/db.server"; import { prisma } from "~/db.server"; import { enqueueRunExecutionV2 } from "~/models/jobRunExecution.server"; @@ -83,7 +88,9 @@ export class StartRunService { const updatedRun = await updateRun(); - await enqueueRunExecutionV2(updatedRun, this.#prismaClient); + await enqueueRunExecutionV2(updatedRun, this.#prismaClient, { + skipRetrying: run.environment.type === RuntimeEnvironmentType.DEVELOPMENT, + }); } async #handleMissingConnections(id: string, runConnectionsByKey: RunConnectionsByKey) { @@ -140,6 +147,7 @@ async function findRun(tx: PrismaClientOrTransaction, id: string) { where: { id }, include: { queue: true, + environment: true, version: { include: { integrations: { diff --git a/apps/webapp/app/services/sources/handleHttpSource.server.ts b/apps/webapp/app/services/sources/handleHttpSource.server.ts index 019a562206..1abd7da57e 100644 --- a/apps/webapp/app/services/sources/handleHttpSource.server.ts +++ b/apps/webapp/app/services/sources/handleHttpSource.server.ts @@ -2,6 +2,7 @@ import type { PrismaClient } from "~/db.server"; import { prisma } from "~/db.server"; import { workerQueue } from "../worker.server"; import { requestUrl } from "~/utils/requestUrl.server"; +import { RuntimeEnvironmentType } from "@trigger.dev/database"; export class HandleHttpSourceService { #prismaClient: PrismaClient; @@ -55,6 +56,8 @@ export class HandleHttpSourceService { { queueName: `endpoint-${triggerSource.endpointId}`, tx, + maxAttempts: + triggerSource.environment.type === RuntimeEnvironmentType.DEVELOPMENT ? 1 : undefined, } ); }); diff --git a/apps/webapp/app/services/tasks/performTaskOperation.server.ts b/apps/webapp/app/services/tasks/performTaskOperation.server.ts index eb425bbf57..fb5a6304ad 100644 --- a/apps/webapp/app/services/tasks/performTaskOperation.server.ts +++ b/apps/webapp/app/services/tasks/performTaskOperation.server.ts @@ -1,3 +1,5 @@ +import { env } from "process"; +import { Run } from "~/presenters/RunPresenter.server"; import { FetchOperationSchema, FetchRequestInit, @@ -6,7 +8,7 @@ import { RedactString, calculateRetryAt, } from "@trigger.dev/core"; -import type { Task } from "@trigger.dev/database"; +import { RuntimeEnvironmentType, type Task } from "@trigger.dev/database"; import { $transaction, PrismaClient, PrismaClientOrTransaction, prisma } from "~/db.server"; import { enqueueRunExecutionV2 } from "~/models/jobRunExecution.server"; import { formatUnknownError } from "~/utils/formatErrors.server"; @@ -244,7 +246,9 @@ export class PerformTaskOperationService { } async #resumeRunExecution(task: NonNullable, prisma: PrismaClientOrTransaction) { - await enqueueRunExecutionV2(task.run, prisma); + await enqueueRunExecutionV2(task.run, prisma, { + skipRetrying: task.run.environment.type === RuntimeEnvironmentType.DEVELOPMENT, + }); } } @@ -281,6 +285,7 @@ async function findTask(prisma: PrismaClient, id: string) { attempts: true, run: { include: { + environment: true, queue: true, }, },