diff --git a/.github/workflows/check-only.yml b/.github/workflows/check-only.yml new file mode 100644 index 00000000000..a9a85e6570a --- /dev/null +++ b/.github/workflows/check-only.yml @@ -0,0 +1,20 @@ +name: Check for .only flags + +on: + pull_request: + branches: [ "**" ] + +jobs: + check-only: + name: Check for .only in tests + runs-on: ubuntu-latest + timeout-minutes: 5 + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Check for .only flags + run: | + chmod +x .github/workflows/scripts/stop-only.sh + .github/workflows/scripts/stop-only.sh . diff --git a/.github/workflows/scripts/stop-only.sh b/.github/workflows/scripts/stop-only.sh new file mode 100755 index 00000000000..b59f599b6e7 --- /dev/null +++ b/.github/workflows/scripts/stop-only.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +# Define the search directory (default to current directory) +SEARCH_DIR=${1:-.} + +# Find all matching test files and search for ".only" +echo "🔍 Searching for '.only' in test files" + +# Search for .only patterns and store results +FOUND_FILES=$(grep -r "it.only\|describe.only\|test.only" "$SEARCH_DIR" \ + --include="*.e2e.ts" \ + --include="*.e2e-ee.ts" \ + --include="*.spec.ts" \ + --include="*.test.ts" \ + --exclude-dir={node_modules,dist,build} \ + -n | cut -d ":" -f 1,2) + +# Check if any files were found +if [ -n "$FOUND_FILES" ]; then + echo "" + echo "🥵 Found '.only' in the following files:" + echo "$FOUND_FILES" + echo "" + echo "🧹🧹🧹 Please remove '.only' before committing!" + exit 1 +else + echo "✅ No '.only' found in test files." + exit 0 +fi diff --git a/apps/api/e2e/setup.ts b/apps/api/e2e/setup.ts index ba170c8d4e2..c57b9b5abe7 100644 --- a/apps/api/e2e/setup.ts +++ b/apps/api/e2e/setup.ts @@ -1,17 +1,32 @@ -import { testServer } from '@novu/testing'; +import { testServer, TestingQueueService, JobsService } from '@novu/testing'; import sinon from 'sinon'; import chai from 'chai'; -import { default as mongoose } from 'mongoose'; +import mongoose from 'mongoose'; +import { JobRepository } from '@novu/dal'; +import { JobTopicNameEnum } from '@novu/shared'; import { bootstrap } from '../src/bootstrap'; +const jobRepository = new JobRepository(); +const workflowQueue = new TestingQueueService(JobTopicNameEnum.WORKFLOW).queue; +const standardQueue = new TestingQueueService(JobTopicNameEnum.STANDARD).queue; +const subscriberProcessQueue = new TestingQueueService(JobTopicNameEnum.PROCESS_SUBSCRIBER).queue; + +let connection: typeof mongoose; + +async function getConnection() { + if (!connection) { + connection = await mongoose.connect(process.env.MONGO_URL); + } + + return connection; +} + async function dropDatabase() { try { - await mongoose.connect(process.env.MONGO_URL); - await mongoose.connection.db.dropDatabase(); + const conn = await getConnection(); + await conn.connection.db.dropDatabase(); } catch (error) { console.error('Error dropping the database:', error); - } finally { - await mongoose.disconnect(); } } @@ -27,9 +42,38 @@ before(async () => { after(async () => { await testServer.teardown(); await dropDatabase(); + if (connection) { + await connection.disconnect(); + } }); -// TODO: Remove this -afterEach(() => { +async function cleanup() { + const jobsService = new JobsService(); + await jobsService.runAllDelayedJobsImmediately(); + await jobsService.awaitAllJobs(); + + await Promise.all([workflowQueue.drain(), standardQueue.drain(), subscriberProcessQueue.drain()]); + + await jobRepository._model.deleteMany({}); +} + +function timeoutPromise(ms: number) { + // eslint-disable-next-line no-promise-executor-return + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +afterEach(async function () { + const TIMEOUT = 4500; sinon.restore(); + + try { + await Promise.race([ + cleanup(), + timeoutPromise(TIMEOUT).then(() => { + console.warn('Cleanup operation timed out after 5000ms - continuing with tests'); + }), + ]); + } catch (error) { + console.error('Error during cleanup:', error); + } }); diff --git a/apps/api/src/app/analytics/analytics.controller.ts b/apps/api/src/app/analytics/analytics.controller.ts index 499cfe1239a..33e7f7f5b8e 100644 --- a/apps/api/src/app/analytics/analytics.controller.ts +++ b/apps/api/src/app/analytics/analytics.controller.ts @@ -47,6 +47,12 @@ export class AnalyticsController { jobTitle: body.jobTitle, }); + this.analyticsService.updateGroup(user._id, user.organizationId, { + organizationType: body.organizationType, + companySize: body.companySize, + jobTitle: body.jobTitle, + }); + await this.hubspotIdentifyFormUsecase.execute( HubspotIdentifyFormCommand.create({ email: user.email as string, diff --git a/apps/api/src/app/billing/e2e/customer-subscription-deleted.e2e-ee.ts b/apps/api/src/app/billing/e2e/customer-subscription-deleted.e2e-ee.ts index c42257ccff1..97503af95d4 100644 --- a/apps/api/src/app/billing/e2e/customer-subscription-deleted.e2e-ee.ts +++ b/apps/api/src/app/billing/e2e/customer-subscription-deleted.e2e-ee.ts @@ -103,7 +103,7 @@ const verifyCustomerMock = { ], }; -describe('webhook event - customer.subscription.deleted #novu-v2', () => { +describe.skip('webhook event - customer.subscription.deleted #novu-v2', () => { const eeBilling = require('@novu/ee-billing'); if (!eeBilling) { throw new Error('ee-billing does not exist'); diff --git a/apps/api/src/app/billing/e2e/get-prices.e2e-ee.ts b/apps/api/src/app/billing/e2e/get-prices.e2e-ee.ts index 09bcb4e564d..2f9cd3e2475 100644 --- a/apps/api/src/app/billing/e2e/get-prices.e2e-ee.ts +++ b/apps/api/src/app/billing/e2e/get-prices.e2e-ee.ts @@ -19,8 +19,14 @@ describe('GetPrices #novu-v2', () => { }, }; let listPricesStub: sinon.SinonStub; + let getFeatureFlagStub: { execute: sinon.SinonStub }; + const IS_2025_Q1_TIERING_ENABLED = true; beforeEach(() => { + getFeatureFlagStub = { + execute: sinon.stub().resolves(IS_2025_Q1_TIERING_ENABLED), + }; + listPricesStub = stripeStub.prices.list; listPricesStub.onFirstCall().resolves({ data: [{ id: 'licensed_price_id_1' }], @@ -32,9 +38,14 @@ describe('GetPrices #novu-v2', () => { afterEach(() => { listPricesStub.reset(); + getFeatureFlagStub.execute.reset(); }); - const createUseCase = () => new GetPrices(stripeStub as any); + const createUseCase = () => new GetPrices(stripeStub, getFeatureFlagStub); + + const freeMeteredPriceLookupKey = IS_2025_Q1_TIERING_ENABLED + ? ['free_usage_notifications_10k'] + : ['free_usage_notifications']; const expectedPrices = [ { @@ -42,7 +53,7 @@ describe('GetPrices #novu-v2', () => { billingInterval: StripeBillingIntervalEnum.MONTH, prices: { licensed: ['free_flat_monthly'], - metered: ['free_usage_notifications'], + metered: freeMeteredPriceLookupKey, }, }, { diff --git a/apps/api/src/app/events/e2e/bridge-trigger.e2e.ts b/apps/api/src/app/events/e2e/bridge-trigger.e2e.ts index d513b5448db..f03d690ae6c 100644 --- a/apps/api/src/app/events/e2e/bridge-trigger.e2e.ts +++ b/apps/api/src/app/events/e2e/bridge-trigger.e2e.ts @@ -2,7 +2,7 @@ import axios from 'axios'; import { expect } from 'chai'; import sinon from 'sinon'; -import { SubscribersService, UserSession } from '@novu/testing'; +import { JobsService, SubscribersService, TestingQueueService, UserSession } from '@novu/testing'; import { ExecutionDetailsRepository, JobRepository, @@ -15,10 +15,10 @@ import { CreateWorkflowDto, ExecutionDetailsStatusEnum, JobStatusEnum, + JobTopicNameEnum, MessagesStatusEnum, StepTypeEnum, WorkflowCreationSourceEnum, - WorkflowOriginEnum, WorkflowResponseDto, } from '@novu/shared'; import { workflow } from '@novu/framework'; @@ -44,8 +44,21 @@ contexts.forEach((context: Context) => { let subscriber: SubscriberEntity; let subscriberService: SubscribersService; const executionDetailsRepository = new ExecutionDetailsRepository(); + const jobsService = new JobsService(); let bridge; + const printJobsState = async (prefix: string) => { + const count = await Promise.all([ + jobRepository.count({} as any), + new TestingQueueService(JobTopicNameEnum.WORKFLOW).queue.getWaitingCount(), + new TestingQueueService(JobTopicNameEnum.PROCESS_SUBSCRIBER).queue.getWaitingCount(), + new TestingQueueService(JobTopicNameEnum.STANDARD).queue.getWaitingCount(), + ]); + + // eslint-disable-next-line no-console + console.log(`${prefix} Jobs state `, count); + }; + beforeEach(async () => { bridgeServer = new BridgeServer(); bridge = context.isStateful ? undefined : { url: `${bridgeServer.serverPath}/novu` }; @@ -564,8 +577,17 @@ contexts.forEach((context: Context) => { await discoverAndSyncBridge(session, workflowsRepository, workflowId, bridgeServer); } + await printJobsState('before triggerEvent'); await triggerEvent(session, workflowId, subscriber.subscriberId, {}, bridge); + await printJobsState('after triggerEvent'); + + await session.runAllDelayedJobsImmediately(); + await printJobsState('after runAllDelayedJobsImmediately'); + await session.waitForJobCompletion(); + await printJobsState('after waitForJobCompletion'); + + await session.runAllDelayedJobsImmediately(); await session.waitForJobCompletion(); const messagesAfter = await messageRepository.find({ @@ -598,7 +620,7 @@ contexts.forEach((context: Context) => { await bridgeServer.start({ workflows: [exceedMaxTierDurationWorkflow] }); if (context.isStateful) { - await discoverAndSyncBridge(session, workflowsRepository, workflowId, bridgeServer); + await discoverAndSyncBridge(session, workflowsRepository, exceedMaxTierDurationWorkflowId, bridgeServer); } const result = await triggerEvent(session, exceedMaxTierDurationWorkflowId, subscriber.subscriberId, {}, bridge); @@ -615,6 +637,7 @@ contexts.forEach((context: Context) => { }); it(`should trigger the bridge workflow with control default and payload data [${context.name}]`, async () => { + await printJobsState('test init'); const workflowId = `default-payload-params-workflow-${`${context.name}`}`; const newWorkflow = workflow( workflowId, @@ -655,10 +678,28 @@ contexts.forEach((context: Context) => { await discoverAndSyncBridge(session, workflowsRepository, workflowId, bridgeServer); } + await printJobsState('before trigger 1'); + await triggerEvent(session, workflowId, subscriber.subscriberId, {}, bridge); + await session.waitForJobCompletion(); + + await printJobsState('after trigger 1'); + await triggerEvent(session, workflowId, subscriber.subscriberId, { name: 'payload_name' }, bridge); - await session.waitForJobCompletion(); + + await printJobsState('after trigger 2'); + + await jobsService.awaitAllJobs(); + + await printJobsState('before sleep'); + + // eslint-disable-next-line no-promise-executor-return + await new Promise((resolve) => setTimeout(resolve, 100)); + + await printJobsState('after sleep'); + + await jobsService.awaitAllJobs(); const sentMessage = await messageRepository.find({ _environmentId: session.environment._id, diff --git a/apps/api/src/app/events/e2e/digest-events.e2e.ts b/apps/api/src/app/events/e2e/digest-events.e2e.ts index c14169e1da9..2bf1d3b5bae 100644 --- a/apps/api/src/app/events/e2e/digest-events.e2e.ts +++ b/apps/api/src/app/events/e2e/digest-events.e2e.ts @@ -10,7 +10,7 @@ import { JobEntity, } from '@novu/dal'; import { StepTypeEnum, DigestTypeEnum, DigestUnitEnum, IDigestRegularMetadata } from '@novu/shared'; -import { UserSession, SubscribersService } from '@novu/testing'; +import { UserSession, SubscribersService, JobsService } from '@novu/testing'; const axiosInstance = axios.create(); @@ -26,6 +26,7 @@ describe('Trigger event - Digest triggered events - /v1/events/trigger (POST) #n let subscriberService: SubscribersService; const jobRepository = new JobRepository(); const messageRepository = new MessageRepository(); + const jobsService = new JobsService(); const triggerEvent = async (payload, transactionId?: string): Promise => { await axiosInstance.post( @@ -255,7 +256,7 @@ describe('Trigger event - Digest triggered events - /v1/events/trigger (POST) #n const mergedJobs = jobs.filter((elem) => elem.status !== JobStatusEnum.DELAYED); expect(mergedJobs && mergedJobs.length).to.eql(1); - await session.waitForJobCompletion(template?._id, false, 1); + await jobsService.awaitAllJobs(); const finalJobs = await jobRepository.find({ _environmentId: session.environment._id, diff --git a/apps/api/src/app/events/e2e/trigger-event.e2e.ts b/apps/api/src/app/events/e2e/trigger-event.e2e.ts index 1f5b64e0026..7b3f47bfbcf 100644 --- a/apps/api/src/app/events/e2e/trigger-event.e2e.ts +++ b/apps/api/src/app/events/e2e/trigger-event.e2e.ts @@ -14,7 +14,7 @@ import { SubscriberRepository, TenantRepository, } from '@novu/dal'; -import { SubscribersService, UserSession, WorkflowOverrideService } from '@novu/testing'; +import { SubscribersService, TestingQueueService, UserSession, WorkflowOverrideService } from '@novu/testing'; import { ActorTypeEnum, ChannelTypeEnum, @@ -30,6 +30,7 @@ import { FilterPartTypeEnum, IEmailBlock, InAppProviderIdEnum, + JobTopicNameEnum, PreviousStepTypeEnum, SmsProviderIdEnum, StepTypeEnum, @@ -68,6 +69,18 @@ describe('Trigger event - /v1/events/trigger (POST) #novu-v2', function () { const tenantRepository = new TenantRepository(); let novuClient: Novu; + const printJobsState = async (prefix: string) => { + const count = await Promise.all([ + jobRepository.count({} as any), + new TestingQueueService(JobTopicNameEnum.WORKFLOW).queue.getWaitingCount(), + new TestingQueueService(JobTopicNameEnum.PROCESS_SUBSCRIBER).queue.getWaitingCount(), + new TestingQueueService(JobTopicNameEnum.STANDARD).queue.getWaitingCount(), + ]); + + // eslint-disable-next-line no-console + console.log(`${prefix} Jobs state `, count); + }; + beforeEach(async () => { session = new UserSession(); await session.initialize(); @@ -126,6 +139,7 @@ describe('Trigger event - /v1/events/trigger (POST) #novu-v2', function () { ], }); + await printJobsState('before triggerEvent'); await novuClient.trigger({ workflowId: template.triggers[0].identifier, to: [subscriber.subscriberId], @@ -134,8 +148,12 @@ describe('Trigger event - /v1/events/trigger (POST) #novu-v2', function () { }, }); + await printJobsState('after triggerEvent'); + await session.waitForJobCompletion(template?._id, true, 0); + await printJobsState('after waitForJobCompletion'); + const messagesAfter = await messageRepository.find({ _environmentId: session.environment._id, _subscriberId: subscriber._id, diff --git a/apps/api/src/app/subscribers-v2/subscribers.controller.e2e.ts b/apps/api/src/app/subscribers-v2/subscribers.controller.e2e.ts index cc4cfc18524..93481e5fdf4 100644 --- a/apps/api/src/app/subscribers-v2/subscribers.controller.e2e.ts +++ b/apps/api/src/app/subscribers-v2/subscribers.controller.e2e.ts @@ -77,17 +77,24 @@ describe('Subscriber Controller E2E API Testing #novu-v2', () => { const subscribers = await getAllAndValidate({ searchParams: { phone: '1234567' }, + expectedTotalResults: 1, + expectedArraySize: 1, + }); + + await getAllAndValidate({ + searchParams: { phone: '7145' }, expectedTotalResults: 0, expectedArraySize: 0, }); - const subscribers2 = await getAllAndValidate({ + const subscribers3 = await getAllAndValidate({ searchParams: { phone: '+1234567890' }, expectedTotalResults: 1, expectedArraySize: 1, }); - expect(subscribers2[0].phone).to.equal('+1234567890'); + expect(subscribers[0].phone).to.equal('+1234567890'); + expect(subscribers3[0].phone).to.equal('+1234567890'); }); it('should find subscriber by full name', async () => { @@ -116,6 +123,59 @@ describe('Subscriber Controller E2E API Testing #novu-v2', () => { expect(subscribers[0].subscriberId).to.equal(`test-subscriber-${uuid}`); }); + + it('should find subscriber by partial email match', async () => { + const uuid = generateUUID(); + await createSubscriberAndValidate(uuid); + + const subscribers = await getAllAndValidate({ + searchParams: { email: `test-${uuid.substring(0, 5)}` }, + expectedTotalResults: 1, + expectedArraySize: 1, + }); + + expect(subscribers[0].email).to.contain(uuid); + }); + + it('should find subscriber by partial phone match', async () => { + const uuid = generateUUID(); + await createSubscriberAndValidate(uuid); + + const subscribers = await getAllAndValidate({ + searchParams: { phone: '123456' }, + expectedTotalResults: 1, + expectedArraySize: 1, + }); + + expect(subscribers[0].phone).to.equal('+1234567890'); + }); + + it('should find subscriber by partial name match', async () => { + const uuid = generateUUID(); + await createSubscriberAndValidate(uuid); + + const subscribers = await getAllAndValidate({ + searchParams: { name: `Test ${uuid.substring(0, 5)}` }, + expectedTotalResults: 1, + expectedArraySize: 1, + }); + + expect(subscribers[0].firstName).to.contain(uuid.substring(0, 5)); + expect(subscribers[0].lastName).to.equal('Subscriber'); + }); + + it('should find subscriber by partial subscriberId match', async () => { + const uuid = generateUUID(); + await createSubscriberAndValidate(uuid); + + const subscribers = await getAllAndValidate({ + searchParams: { subscriberId: `test-sub` }, + expectedTotalResults: 1, + expectedArraySize: 1, + }); + + expect(subscribers[0].subscriberId).to.equal(`test-subscriber-${uuid}`); + }); }); describe('List Subscriber Cursor Pagination', () => { diff --git a/apps/api/src/app/workflows-v2/usecases/build-step-issues/build-step-issues.usecase.ts b/apps/api/src/app/workflows-v2/usecases/build-step-issues/build-step-issues.usecase.ts index 47c103cddd9..9f2f1a4de6c 100644 --- a/apps/api/src/app/workflows-v2/usecases/build-step-issues/build-step-issues.usecase.ts +++ b/apps/api/src/app/workflows-v2/usecases/build-step-issues/build-step-issues.usecase.ts @@ -325,7 +325,7 @@ export class BuildStepIssuesUsecase { _environmentId: args.environmentId, _organizationId: args.organizationId, active: true, - primary: primaryNeeded ? true : undefined, + ...(primaryNeeded && { primary: true }), channel: args.stepTypeDto, }); diff --git a/apps/dashboard/public/images/country_flags.svg b/apps/dashboard/public/images/country_flags.svg deleted file mode 100644 index c23ddf10aa2..00000000000 --- a/apps/dashboard/public/images/country_flags.svg +++ /dev/null @@ -1 +0,0 @@ - diff --git a/apps/dashboard/src/api/activity.ts b/apps/dashboard/src/api/activity.ts index 7500ef3cdd0..3d7923c5771 100644 --- a/apps/dashboard/src/api/activity.ts +++ b/apps/dashboard/src/api/activity.ts @@ -80,8 +80,10 @@ function getDateRangeInDays(range: string): number { } } -export function getNotification(notificationId: string, environment: IEnvironment) { - return get<{ data: IActivity }>(`/notifications/${notificationId}`, { +export async function getNotification(notificationId: string, environment: IEnvironment): Promise { + const { data } = await get<{ data: IActivity }>(`/notifications/${notificationId}`, { environment, }); + + return data; } diff --git a/apps/dashboard/src/api/telemetry.ts b/apps/dashboard/src/api/telemetry.ts index b0c3baf4005..ee62378197f 100644 --- a/apps/dashboard/src/api/telemetry.ts +++ b/apps/dashboard/src/api/telemetry.ts @@ -17,7 +17,7 @@ interface IdentifyUserProps { pageName: string; jobTitle: JobTitleEnum; organizationType: OrganizationTypeEnum; - companySize?: CompanySizeEnum; + companySize?: CompanySizeEnum | string; anonymousId?: string | null; } diff --git a/apps/dashboard/src/components/activity/activity-empty-state.tsx b/apps/dashboard/src/components/activity/activity-empty-state.tsx index 419f7cb8ef9..3ba0ecbfb34 100644 --- a/apps/dashboard/src/components/activity/activity-empty-state.tsx +++ b/apps/dashboard/src/components/activity/activity-empty-state.tsx @@ -113,13 +113,7 @@ export function ActivityEmptyState({ }} className="flex items-center gap-6" > - + View Docs - + - - + } + /** + * Scroll to top bug workaround: https://github.com/pacocoursey/cmdk/issues/233#issuecomment-2015998940 + */ + onValueChange={() => { + // clear pending scroll + clearTimeout(scrollId.current); + + // the setTimeout is used to create a new task + // this is to make sure that we don't scroll until the user is done typing + // you can tweak the timeout duration ofc + scrollId.current = setTimeout(() => { + // inside your list select the first group and scroll to the top + const div = listRef.current; + div?.scrollTo({ top: 0, behavior: 'smooth' }); + }, 0); + }} + autoComplete="off" + /> + No country found. - - - {countryList.map(({ value, label }) => - value ? ( - - ) : null - )} - - + + {countryList.map(({ value, label }) => + value ? ( + + ) : null + )} + @@ -98,7 +118,7 @@ const InputComponent = React.forwardRef ( - + ) @@ -129,7 +149,7 @@ const FlagComponent = ({ country, countryName }: RPNInput.FlagProps) => { className="bg-foreground/20 flex h-4 w-6 overflow-hidden rounded-sm drop-shadow-md [&_svg]:size-full" key={country} > - {Flag ? : } + {Flag ? : } ); }; diff --git a/apps/dashboard/src/components/primitives/sheet.tsx b/apps/dashboard/src/components/primitives/sheet.tsx index c570b0107b2..4db9f2450ef 100644 --- a/apps/dashboard/src/components/primitives/sheet.tsx +++ b/apps/dashboard/src/components/primitives/sheet.tsx @@ -58,7 +58,7 @@ const SheetContent = React.forwardRef - + Close diff --git a/apps/dashboard/src/components/subscribers/create-subscriber-form.tsx b/apps/dashboard/src/components/subscribers/create-subscriber-form.tsx index a6af4f55f64..9a3009caa9f 100644 --- a/apps/dashboard/src/components/subscribers/create-subscriber-form.tsx +++ b/apps/dashboard/src/components/subscribers/create-subscriber-form.tsx @@ -21,6 +21,8 @@ import { UnsavedChangesAlertDialog } from '../unsaved-changes-alert-dialog'; import { LocaleSelect } from './locale-select'; import { CreateSubscriberFormSchema } from './schema'; import { TimezoneSelect } from './timezone-select'; +import { useTelemetry } from '@/hooks/use-telemetry'; +import { TelemetryEvent } from '@/utils/telemetry'; const extensions = [loadLanguage('json')?.extension ?? []]; const basicSetup = { lineNumbers: true, defaultKeymap: true }; @@ -36,6 +38,7 @@ type CreateSubscriberFormProps = { }; export const CreateSubscriberForm = (props: CreateSubscriberFormProps) => { + const track = useTelemetry(); const { onSuccess } = props; const form = useForm>({ @@ -52,6 +55,7 @@ export const CreateSubscriberForm = (props: CreateSubscriberFormProps) => { }, resolver: zodResolver(CreateSubscriberFormSchema), shouldFocusError: false, + mode: 'onBlur', }); const isDirty = Object.keys(form.formState.dirtyFields).length > 0; @@ -62,6 +66,7 @@ export const CreateSubscriberForm = (props: CreateSubscriberFormProps) => { onSuccess: () => { showSuccessToast('Created subscriber successfully', undefined, toastOptions); onSuccess?.(); + track(TelemetryEvent.SUBSCRIBER_CREATED); }, onError: (error) => { const errMsg = error instanceof Error ? error.message : 'Failed to create subscriber'; @@ -239,15 +244,21 @@ export const CreateSubscriberForm = (props: CreateSubscriberFormProps) => { /> -
+
( - + Locale - + { + const finalValue = field.value === val ? '' : val; + field.onChange(finalValue); + }} + /> @@ -257,10 +268,16 @@ export const CreateSubscriberForm = (props: CreateSubscriberFormProps) => { control={form.control} name="timezone" render={({ field }) => ( - + Timezone - + { + const finalValue = field.value === val ? '' : val; + field.onChange(finalValue); + }} + /> @@ -285,7 +302,7 @@ export const CreateSubscriberForm = (props: CreateSubscriberFormProps) => { className="overflow-auto" extensions={extensions} basicSetup={basicSetup} - placeholder="Custom data (JSON)" + placeholder="{}" height="100%" multiline {...field} diff --git a/apps/dashboard/src/components/subscribers/locale-select.tsx b/apps/dashboard/src/components/subscribers/locale-select.tsx index 4c44b73b288..5c1961d4c9b 100644 --- a/apps/dashboard/src/components/subscribers/locale-select.tsx +++ b/apps/dashboard/src/components/subscribers/locale-select.tsx @@ -1,78 +1,130 @@ -import { CountryFlags } from '@/components/icons/country-flags'; -import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from '@/components/primitives/select'; -import TruncatedText from '@/components/truncated-text'; import { locales } from '@/utils/locales'; -import { useState } from 'react'; -import { RiEarthLine } from 'react-icons/ri'; +import { cn } from '@/utils/ui'; +import { RiArrowDownSLine, RiCheckLine, RiEarthLine, RiSearchLine } from 'react-icons/ri'; +import { type Country } from 'react-phone-number-input'; +import flags from 'react-phone-number-input/flags'; +import { Button } from '../primitives/button'; +import { Command, CommandEmpty, CommandGroup, CommandInput, CommandItem, CommandList } from '../primitives/command'; +import { Popover, PopoverContent, PopoverTrigger } from '../primitives/popover'; +import TruncatedText from '../truncated-text'; +import { useRef, useState } from 'react'; export function LocaleSelect({ value, - defaultOption, + onChange, disabled, - onValueChange, readOnly, - required, }: { value?: string; - defaultOption?: string; - size?: 'sm' | 'md'; disabled?: boolean; readOnly?: boolean; - required?: boolean; - onValueChange: (val: string) => void; + onChange: (val: string) => void; }) { const [open, setOpen] = useState(false); + const currentCountryCode = value?.split('_')?.[1] as Country; + const CurrentFlag = currentCountryCode ? flags[currentCountryCode] : RiEarthLine; + + const listRef = useRef(null); + const scrollId = useRef>(); return ( - + + + + + } + /** + * Scroll to top bug workaround: https://github.com/pacocoursey/cmdk/issues/233#issuecomment-2015998940 + */ + onValueChange={() => { + // clear pending scroll + clearTimeout(scrollId.current); + + // the setTimeout is used to create a new task + // this is to make sure that we don't scroll until the user is done typing + // you can tweak the timeout duration ofc + scrollId.current = setTimeout(() => { + // inside your list select the first group and scroll to the top + const div = listRef.current; + div?.scrollTo({ top: 0, behavior: 'smooth' }); + }, 0); + }} + /> + + No locale found. + + {locales.map((item) => ( + { + onChange(val); + setOpen(false); + }} + currentValue={value} + /> + ))} + + + + + ); } + +const FlagItem = ({ + countryCode, + languageName, + optionValue, + onChange, + currentValue, +}: { + countryCode: string; + languageName: string; + optionValue: string; + onChange: (val: string) => void; + currentValue?: string; +}) => { + const CurrentFlag = countryCode ? flags[countryCode as Country] : RiEarthLine; + const isSelected = optionValue === currentValue; + + return ( + onChange(optionValue)} + > +
+ {CurrentFlag && } + + {optionValue} - {languageName} + + +
+
+ ); +}; diff --git a/apps/dashboard/src/components/subscribers/preferences/preferences.tsx b/apps/dashboard/src/components/subscribers/preferences/preferences.tsx index 75b63104efe..f86b14fc1be 100644 --- a/apps/dashboard/src/components/subscribers/preferences/preferences.tsx +++ b/apps/dashboard/src/components/subscribers/preferences/preferences.tsx @@ -4,6 +4,8 @@ import { SidebarContent } from '@/components/side-navigation/sidebar'; import { PreferencesItem } from '@/components/subscribers/preferences/preferences-item'; import { WorkflowPreferences } from '@/components/subscribers/preferences/workflow-preferences'; import { usePatchSubscriberPreferences } from '@/hooks/use-patch-subscriber-preferences'; +import { useTelemetry } from '@/hooks/use-telemetry'; +import { TelemetryEvent } from '@/utils/telemetry'; import { GetSubscriberPreferencesDto, PatchPreferenceChannelsDto } from '@novu/api/models/components'; import { ChannelTypeEnum } from '@novu/shared'; import { motion } from 'motion/react'; @@ -18,9 +20,12 @@ type PreferencesProps = { export const Preferences = (props: PreferencesProps) => { const { subscriberPreferences, subscriberId, readOnly = false } = props; + const track = useTelemetry(); + const { patchSubscriberPreferences } = usePatchSubscriberPreferences({ onSuccess: () => { showSuccessToast('Subscriber preferences updated successfully'); + track(TelemetryEvent.SUBSCRIBER_PREFERENCES_UPDATED); }, }); diff --git a/apps/dashboard/src/components/subscribers/schema.ts b/apps/dashboard/src/components/subscribers/schema.ts index 4ec8fa408e8..4b864441b7e 100644 --- a/apps/dashboard/src/components/subscribers/schema.ts +++ b/apps/dashboard/src/components/subscribers/schema.ts @@ -12,8 +12,8 @@ export const SubscriberFormSchema = z.object({ .or(z.literal('')) .optional(), avatar: z.string().optional(), - locale: z.string().optional(), - timezone: z.string().optional(), + locale: z.string().optional().nullable(), + timezone: z.string().optional().nullable(), data: z .string() .transform((str, ctx) => { @@ -42,4 +42,6 @@ export const CreateSubscriberFormSchema = SubscriberFormSchema.extend({ .refine((val) => val === '' || z.string().email().safeParse(val).success, { message: 'Invalid email', }), + locale: z.string().optional(), + timezone: z.string().optional(), }); diff --git a/apps/dashboard/src/components/subscribers/subscriber-activity-drawer.tsx b/apps/dashboard/src/components/subscribers/subscriber-activity-drawer.tsx new file mode 100644 index 00000000000..b09b5fa1050 --- /dev/null +++ b/apps/dashboard/src/components/subscribers/subscriber-activity-drawer.tsx @@ -0,0 +1,67 @@ +import { forwardRef } from 'react'; +import { VisuallyHidden } from '@radix-ui/react-visually-hidden'; + +import { SheetContent, SheetDescription, SheetTitle } from '@/components/primitives/sheet'; +import { usePullActivity } from '@/hooks/use-pull-activity'; +import { Sheet } from '@/components/primitives/sheet'; +import { cn } from '@/utils/ui'; +import { ActivityPanel } from '@/components/activity/activity-panel'; +import { ActivitySkeleton } from '@/components/activity/activity-skeleton'; +import { ActivityError } from '@/components/activity/activity-error'; +import { ActivityHeader } from '@/components/activity/activity-header'; +import { ActivityOverview } from '@/components/activity/components/activity-overview'; +import { ActivityLogs } from '@/components/activity/activity-logs'; + +type ActivityPanelDrawerProps = { + onActivitySelect: (activityId: string) => void; + activityId: string; +}; + +export const ActivityDetailsDrawer = forwardRef((props, ref) => { + const { activityId, onActivitySelect } = props; + const isOpen = !!activityId; + const { activity, isPending, error } = usePullActivity(activityId); + + return ( + { + if (!isOpen) { + onActivitySelect(''); + } + }} + > +
+ + + + + + + {isPending ? ( + + ) : error || !activity ? ( + + ) : ( + <> + + + + + )} + + + + ); +}); diff --git a/apps/dashboard/src/components/subscribers/subscriber-activity-list.tsx b/apps/dashboard/src/components/subscribers/subscriber-activity-list.tsx index 2d07dbda692..0fb1f7ee550 100644 --- a/apps/dashboard/src/components/subscribers/subscriber-activity-list.tsx +++ b/apps/dashboard/src/components/subscribers/subscriber-activity-list.tsx @@ -38,11 +38,13 @@ export const SubscriberActivityList = ({ activities, hasChangesInFilters, onClearFilters, + onActivitySelect, }: { isLoading: boolean; activities: IActivity[]; hasChangesInFilters: boolean; onClearFilters: () => void; + onActivitySelect: (activityId: string) => void; }) => { if (!isLoading && activities.length === 0) { return ( @@ -117,21 +119,26 @@ export const SubscriberActivityList = ({ key={activity._id} {...staggerSettings(index)} className="border-b-stroke-soft flex w-full cursor-pointer border-b last:border-b-0" + onClick={() => { + onActivitySelect(activity._id); + }} > -
- {activity.template.origin === WorkflowOriginEnum.EXTERNAL ? ( +
+ {activity.template?.origin === WorkflowOriginEnum.EXTERNAL ? ( ) : ( - + )}
- {activity.template.name} + + {activity.template?.name ?? 'Deleted workflow'} + - {activity.template.triggers.map((trigger) => trigger.identifier).join(', ')} + {activity.template?.triggers.map((trigger) => trigger.identifier).join(', ')}
-
+
@@ -150,7 +157,7 @@ export const SubscriberActivityList = ({
-
+
({ dateRange: '30d', @@ -21,7 +22,7 @@ const getInitialFilters = (subscriberId: string): ActivityFiltersData => ({ export const SubscriberActivity = ({ subscriberId }: { subscriberId: string }) => { const { currentEnvironment } = useEnvironment(); const [filters, setFilters] = useState(getInitialFilters(subscriberId)); - + const [activityItemId, setActivityItemId] = useState(''); const { activities, isLoading } = useFetchActivities( { filters, @@ -65,6 +66,10 @@ export const SubscriberActivity = ({ subscriberId }: { subscriberId: string }) = return params; }, [subscriberId, filters]); + const handleActivitySelect = (activityId: string) => { + setActivityItemId(activityId); + }; + return (
@@ -81,8 +86,9 @@ export const SubscriberActivity = ({ subscriberId }: { subscriberId: string }) = activities={activities} hasChangesInFilters={hasChangesInFilters} onClearFilters={handleClearFilters} + onActivitySelect={handleActivitySelect} /> - + To view more detailed activity, View{' '}
+
); }; diff --git a/apps/dashboard/src/components/subscribers/subscriber-overview-form.tsx b/apps/dashboard/src/components/subscribers/subscriber-overview-form.tsx index c64a241631d..18542572538 100644 --- a/apps/dashboard/src/components/subscribers/subscriber-overview-form.tsx +++ b/apps/dashboard/src/components/subscribers/subscriber-overview-form.tsx @@ -8,9 +8,9 @@ import { cn } from '@/utils/ui'; import { zodResolver } from '@hookform/resolvers/zod'; import { SubscriberResponseDto } from '@novu/api/models/components'; import { loadLanguage } from '@uiw/codemirror-extensions-langs'; -import { useState } from 'react'; +import { useEffect, useState } from 'react'; import { useForm } from 'react-hook-form'; -import { RiDeleteBin2Line } from 'react-icons/ri'; +import { RiDeleteBin2Line, RiMailLine } from 'react-icons/ri'; import { Link, useBlocker, useNavigate } from 'react-router-dom'; import { ExternalToast } from 'sonner'; import { z } from 'zod'; @@ -28,6 +28,8 @@ import { UnsavedChangesAlertDialog } from '../unsaved-changes-alert-dialog'; import { SubscriberFormSchema } from './schema'; import { TimezoneSelect } from './timezone-select'; import { getSubscriberTitle } from './utils'; +import { useTelemetry } from '@/hooks/use-telemetry'; +import { TelemetryEvent } from '@/utils/telemetry'; const extensions = [loadLanguage('json')?.extension ?? []]; const basicSetup = { lineNumbers: true, defaultKeymap: true }; @@ -46,10 +48,12 @@ type SubscriberOverviewFormProps = { export function SubscriberOverviewForm(props: SubscriberOverviewFormProps) { const { subscriber, readOnly = false } = props; const [isDeleteModalOpen, setIsDeleteModalOpen] = useState(false); + const track = useTelemetry(); const { deleteSubscriber, isPending: isDeleteSubscriberPending } = useDeleteSubscriber({ onSuccess: () => { showSuccessToast(`Deleted subscriber: ${getSubscriberTitle(subscriber)}`, undefined, toastOptions); + track(TelemetryEvent.SUBSCRIBER_DELETED); }, onError: () => { showErrorToast('Failed to delete subscriber', undefined, toastOptions); @@ -60,14 +64,14 @@ export function SubscriberOverviewForm(props: SubscriberOverviewFormProps) { const form = useForm>({ defaultValues: { - avatar: subscriber.avatar, - email: subscriber.email, - phone: subscriber.phone, - firstName: subscriber.firstName, - lastName: subscriber.lastName, - locale: subscriber.locale, - timezone: subscriber.timezone, - data: JSON.stringify(subscriber.data, null, 2), + avatar: subscriber?.avatar ?? '', + email: subscriber.email || null, + phone: subscriber.phone ?? '', + firstName: subscriber.firstName ?? '', + lastName: subscriber.lastName ?? '', + locale: subscriber.locale ?? null, + timezone: subscriber.timezone ?? null, + data: JSON.stringify(subscriber.data, null, 2) ?? '', }, resolver: zodResolver(SubscriberFormSchema), shouldFocusError: false, @@ -75,16 +79,38 @@ export function SubscriberOverviewForm(props: SubscriberOverviewFormProps) { const { patchSubscriber } = usePatchSubscriber({ onSuccess: (data) => { - showSuccessToast(`Updated subscriber: ${getSubscriberTitle(subscriber)}`, undefined, toastOptions); - form.reset({ ...data, data: JSON.stringify(data.data, null, 2) }); + showSuccessToast(`Updated subscriber: ${getSubscriberTitle(data)}`, undefined, toastOptions); + form.reset({ ...data, data: JSON.stringify(data.data, null, 2) ?? '' }); + track(TelemetryEvent.SUBSCRIBER_EDITED); }, onError: () => { showErrorToast('Failed to update subscriber', undefined, toastOptions); }, }); - const blocker = useBlocker(form.formState.isDirty); - useBeforeUnload(form.formState.isDirty); + /** + * Fixes the issue where you update the form, + * then close the drawer and re-open it, + * the form shows the stale data. + */ + useEffect(() => { + if (subscriber) { + form.reset({ + avatar: subscriber?.avatar ?? '', + email: subscriber.email || null, + phone: subscriber.phone ?? '', + firstName: subscriber.firstName ?? '', + lastName: subscriber.lastName ?? '', + locale: subscriber.locale ?? null, + timezone: subscriber.timezone ?? null, + data: JSON.stringify(subscriber.data, null, 2) ?? '', + }); + } + }, [subscriber, form]); + + const isDirty = Object.keys(form.formState.dirtyFields).length > 0; + const blocker = useBlocker(isDirty); + useBeforeUnload(isDirty); const onSubmit = async (formData: z.infer) => { const dirtyFields = form.formState.dirtyFields; @@ -95,7 +121,7 @@ export function SubscriberOverviewForm(props: SubscriberOverviewFormProps) { const data = JSON.parse(JSON.stringify(formData.data)); return { ...acc, data: data === '' ? {} : data }; } - return { ...acc, [typedKey]: formData[typedKey]?.trim() }; + return { ...acc, [typedKey]: formData[typedKey] === null ? null : formData[typedKey]?.trim() }; }, {}); if (!Object.keys(dirtyPayload).length) { @@ -132,7 +158,7 @@ export function SubscriberOverviewForm(props: SubscriberOverviewFormProps) { Subscriber profile Image can only be updated via API -
+
} - size="xs" + readOnly + disabled />
@@ -223,12 +251,17 @@ export function SubscriberOverviewForm(props: SubscriberOverviewFormProps) { {...field} readOnly={readOnly} type="email" - placeholder={field.name} + placeholder="hello@novu.co" id={field.name} value={field.value || undefined} - onChange={field.onChange} + onChange={(event) => { + const { value } = event.target; + const finalValue = value === '' ? null : value; + field.onChange(finalValue); + }} hasError={!!fieldState.error} size="xs" + leadingIcon={RiMailLine} /> @@ -245,7 +278,7 @@ export function SubscriberOverviewForm(props: SubscriberOverviewFormProps) { @@ -257,15 +290,22 @@ export function SubscriberOverviewForm(props: SubscriberOverviewFormProps) {
-
+
( - + Locale - + { + const finalValue = field.value === val ? null : val; + field.onChange(finalValue); + }} + readOnly={readOnly} + /> @@ -275,10 +315,17 @@ export function SubscriberOverviewForm(props: SubscriberOverviewFormProps) { control={form.control} name="timezone" render={({ field }) => ( - + Timezone - + { + const finalValue = field.value === val ? null : val; + field.onChange(finalValue); + }} + readOnly={readOnly} + /> @@ -301,7 +348,7 @@ export function SubscriberOverviewForm(props: SubscriberOverviewFormProps) { className="overflow-auto" extensions={extensions} basicSetup={basicSetup} - placeholder="Custom data (JSON)" + placeholder="{}" height="100%" multiline {...field} @@ -320,7 +367,7 @@ export function SubscriberOverviewForm(props: SubscriberOverviewFormProps) {
{subscriber.updatedAt && ( - + Updated at{' '} {formatDateSimple(subscriber.updatedAt, { month: 'short', @@ -347,7 +394,7 @@ export function SubscriberOverviewForm(props: SubscriberOverviewFormProps) { > Delete subscriber -
@@ -363,7 +410,7 @@ export function SubscriberOverviewForm(props: SubscriberOverviewFormProps) { setIsDeleteModalOpen(false); navigate('../', { relative: 'path' }); }} - title={`Delete subscriber`} + title="Delete subscriber" description={ Are you sure you want to delete subscriber{' '} diff --git a/apps/dashboard/src/components/subscribers/subscriber-tabs.tsx b/apps/dashboard/src/components/subscribers/subscriber-tabs.tsx index 46603a8989f..8ebe7978277 100644 --- a/apps/dashboard/src/components/subscribers/subscriber-tabs.tsx +++ b/apps/dashboard/src/components/subscribers/subscriber-tabs.tsx @@ -17,7 +17,10 @@ type SubscriberOverviewProps = { const SubscriberOverview = (props: SubscriberOverviewProps) => { const { subscriberId, readOnly = false } = props; - const { data, isPending } = useFetchSubscriber({ subscriberId }); + const { data, isPending } = useFetchSubscriber({ + subscriberId, + }); + if (isPending) { return ; } diff --git a/apps/dashboard/src/components/subscribers/timezone-select.tsx b/apps/dashboard/src/components/subscribers/timezone-select.tsx index 1da0e950e6e..d77018cf5bc 100644 --- a/apps/dashboard/src/components/subscribers/timezone-select.tsx +++ b/apps/dashboard/src/components/subscribers/timezone-select.tsx @@ -1,98 +1,103 @@ -import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from '@/components/primitives/select'; -import TruncatedText from '@/components/truncated-text'; -import { memo, useState } from 'react'; -import { RiTimeLine } from 'react-icons/ri'; +import { cn } from '@/utils/ui'; +import { RiArrowDownSLine, RiCheckLine, RiSearchLine, RiTimeLine } from 'react-icons/ri'; import { useTimezoneSelect } from 'react-timezone-select'; - -// Define a type for timezone options. -type TimezoneOption = { - label: string; - value: string; -}; - -interface TimezoneOptionsProps { - options: TimezoneOption[]; -} - -// Extracted and memoized component for rendering timezone options. -const TimezoneOptions = memo(function TimezoneOptions({ options }: TimezoneOptionsProps) { - return ( - <> - {options.map((item) => ( - - {item.label} - - ))} - - ); -}); +import { Button } from '../primitives/button'; +import { Command, CommandEmpty, CommandGroup, CommandInput, CommandItem, CommandList } from '../primitives/command'; +import { Popover, PopoverContent, PopoverTrigger } from '../primitives/popover'; +import TruncatedText from '../truncated-text'; +import { useRef, useState } from 'react'; export function TimezoneSelect({ value, - defaultOption, disabled, - onValueChange, + onChange, readOnly, - required, }: { value?: string; - defaultOption?: string; - size?: 'sm' | 'md'; disabled?: boolean; readOnly?: boolean; - required?: boolean; - onValueChange: (val: string) => void; + onChange: (val: string) => void; }) { - // State to track whether the select is open. - const [isOpen, setIsOpen] = useState(false); - // Get timezone options and the parse function. + const [open, setOpen] = useState(false); const { options, parseTimezone } = useTimezoneSelect({ labelStyle: 'abbrev', displayValue: 'UTC' }); - - const handleValueChange = (val: string) => { - const parsedValue = parseTimezone(val); - onValueChange(parsedValue.value); - }; + const listRef = useRef(null); + const scrollId = useRef>(); return ( - + + + + + } + /** + * Scroll to top bug workaround: https://github.com/pacocoursey/cmdk/issues/233#issuecomment-2015998940 + */ + onValueChange={() => { + // clear pending scroll + clearTimeout(scrollId.current); + + // the setTimeout is used to create a new task + // this is to make sure that we don't scroll until the user is done typing + // you can tweak the timeout duration ofc + scrollId.current = setTimeout(() => { + // inside your list select the first group and scroll to the top + const div = listRef.current; + div?.scrollTo({ top: 0, behavior: 'smooth' }); + }, 0); + }} + /> + + No timezone found. + + + {options.map((item) => ( + { + const parsedValue = parseTimezone(item.value); + onChange(parsedValue.value); + setOpen(false); + }} + key={item.value} + > + {item.label} + + + ))} + + + + + ); } diff --git a/apps/dashboard/src/components/workflow-editor/test-workflow/test-workflow-logs-sidebar.tsx b/apps/dashboard/src/components/workflow-editor/test-workflow/test-workflow-logs-sidebar.tsx index 55c8a915e8a..8c23381b81a 100644 --- a/apps/dashboard/src/components/workflow-editor/test-workflow/test-workflow-logs-sidebar.tsx +++ b/apps/dashboard/src/components/workflow-editor/test-workflow/test-workflow-logs-sidebar.tsx @@ -10,6 +10,11 @@ import { WorkflowTriggerInboxIllustration } from '../../icons/workflow-trigger-i import { Button } from '../../primitives/button'; import { TestWorkflowFormType } from '../schema'; import { TestWorkflowInstructions } from './test-workflow-instructions'; +import { ActivitySkeleton } from '@/components/activity/activity-skeleton'; +import { ActivityError } from '@/components/activity/activity-error'; +import { ActivityHeader } from '@/components/activity/activity-header'; +import { ActivityOverview } from '@/components/activity/components/activity-overview'; +import { ActivityLogs } from '@/components/activity/activity-logs'; type TestWorkflowLogsSidebarProps = { transactionId?: string; @@ -23,7 +28,7 @@ export const TestWorkflowLogsSidebar = ({ transactionId, workflow }: TestWorkflo const [showInstructions, setShowInstructions] = useState(false); const to = useWatch({ name: 'to', control }); const payload = useWatch({ name: 'payload', control }); - const { activities } = useFetchActivities( + const { activities, isPending, error } = useFetchActivities( { filters: transactionId ? { transactionId } : undefined, }, @@ -32,7 +37,8 @@ export const TestWorkflowLogsSidebar = ({ transactionId, workflow }: TestWorkflo refetchInterval: shouldRefetch ? 1000 : false, } ); - const activityId: string | undefined = parentActivityId ?? activities?.[0]?._id; + const activity = activities?.[0]; + const activityId: string | undefined = parentActivityId ?? activity?._id; useEffect(() => { if (activityId) { @@ -61,36 +67,37 @@ export const TestWorkflowLogsSidebar = ({ transactionId, workflow }: TestWorkflo
) : activityId ? ( <> - - {!workflow?.lastTriggeredAt && ( -
-
-
-
- -
-
-
You have triggered the workflow!
-
Now integrate the workflow in your application.
+ + {isPending ? ( + + ) : error || !activity ? ( + + ) : ( + <> + + + + + )} + {!workflow?.lastTriggeredAt && ( +
+
+
+
+ +
+
+
You have triggered the workflow!
+
Now integrate the workflow in your application.
+
+
-
-
- )} + )} + ) : (
diff --git a/apps/dashboard/src/components/workflow-editor/workflow-checklist.tsx b/apps/dashboard/src/components/workflow-editor/workflow-checklist.tsx index 5ad34a99090..ba9c502586c 100644 --- a/apps/dashboard/src/components/workflow-editor/workflow-checklist.tsx +++ b/apps/dashboard/src/components/workflow-editor/workflow-checklist.tsx @@ -221,7 +221,7 @@ function useChecklistItems(steps: Step[]) { key: 'trigger', title: 'Trigger workflow from your application', description: 'Trigger the workflow to test it in production', - isCompleted: () => workflow?.lastTriggeredAt !== undefined, + isCompleted: () => !!workflow?.lastTriggeredAt, onClick: () => { telemetry(TelemetryEvent.WORKFLOW_CHECKLIST_STEP_CLICKED, { stepTitle: 'Trigger workflow' }); navigate( diff --git a/apps/dashboard/src/hooks/use-fetch-activity.ts b/apps/dashboard/src/hooks/use-fetch-activity.ts index 07ab1a67c72..79f348cc9c0 100644 --- a/apps/dashboard/src/hooks/use-fetch-activity.ts +++ b/apps/dashboard/src/hooks/use-fetch-activity.ts @@ -5,7 +5,7 @@ import { useEnvironment } from '@/context/environment/hooks'; import { getNotification } from '@/api/activity'; export function useFetchActivity( - { activityId }: { activityId?: string }, + { activityId }: { activityId?: string | null }, { refetchInterval = false, refetchOnWindowFocus = false, @@ -14,7 +14,7 @@ export function useFetchActivity( ) { const { currentEnvironment } = useEnvironment(); - const { data, isPending, error } = useQuery<{ data: IActivity }>({ + const { data, isPending, error } = useQuery({ queryKey: [QueryKeys.fetchActivity, currentEnvironment?._id, activityId], queryFn: () => getNotification(activityId!, currentEnvironment!), enabled: !!currentEnvironment?._id && !!activityId, @@ -24,7 +24,7 @@ export function useFetchActivity( }); return { - activity: data?.data, + activity: data, isPending, error, }; diff --git a/apps/dashboard/src/hooks/use-patch-subscriber.ts b/apps/dashboard/src/hooks/use-patch-subscriber.ts index e4ea7c82255..54ff0e687c2 100644 --- a/apps/dashboard/src/hooks/use-patch-subscriber.ts +++ b/apps/dashboard/src/hooks/use-patch-subscriber.ts @@ -17,6 +17,8 @@ export const usePatchSubscriber = ( mutationFn: (args: PatchSubscriberParameters) => patchSubscriber({ environment: currentEnvironment!, ...args }), ...options, onSuccess: async (data, variables, ctx) => { + await queryClient.setQueryData([QueryKeys.fetchSubscriber, variables.subscriberId], data); + await queryClient.invalidateQueries({ queryKey: [QueryKeys.fetchSubscribers], }); diff --git a/apps/dashboard/src/hooks/use-pull-activity.ts b/apps/dashboard/src/hooks/use-pull-activity.ts new file mode 100644 index 00000000000..ddfd768ea19 --- /dev/null +++ b/apps/dashboard/src/hooks/use-pull-activity.ts @@ -0,0 +1,44 @@ +import { useEffect, useState } from 'react'; +import { useQueryClient } from '@tanstack/react-query'; +import { JobStatusEnum } from '@novu/shared'; + +import { useEnvironment } from '@/context/environment/hooks'; +import { useFetchActivity } from '@/hooks/use-fetch-activity'; +import { QueryKeys } from '@/utils/query-keys'; + +export const usePullActivity = (activityId?: string | null) => { + const queryClient = useQueryClient(); + const [shouldRefetch, setShouldRefetch] = useState(true); + const { currentEnvironment } = useEnvironment(); + const { activity, isPending, error } = useFetchActivity( + { activityId }, + { + refetchInterval: shouldRefetch ? 1000 : false, + } + ); + + useEffect(() => { + if (!activity) return; + + const isPending = activity.jobs?.some( + (job) => + job.status === JobStatusEnum.PENDING || + job.status === JobStatusEnum.QUEUED || + job.status === JobStatusEnum.RUNNING || + job.status === JobStatusEnum.DELAYED + ); + + // Only stop refetching if we have an activity and it's not pending + setShouldRefetch(isPending || !activity?.jobs?.length); + + queryClient.invalidateQueries({ + queryKey: [QueryKeys.fetchActivity, currentEnvironment?._id, activityId], + }); + }, [activity, queryClient, currentEnvironment, activityId]); + + return { + activity, + isPending, + error, + }; +}; diff --git a/apps/dashboard/src/pages/activity-feed.tsx b/apps/dashboard/src/pages/activity-feed.tsx index 3b0990baefb..f4540db61e3 100644 --- a/apps/dashboard/src/pages/activity-feed.tsx +++ b/apps/dashboard/src/pages/activity-feed.tsx @@ -9,10 +9,18 @@ import { ResizableHandle, ResizablePanel, ResizablePanelGroup } from '@/componen import { useActivityUrlState } from '@/hooks/use-activity-url-state'; import { PageMeta } from '../components/page-meta'; import { defaultActivityFilters } from '@/components/activity/constants'; +import { usePullActivity } from '@/hooks/use-pull-activity'; +import { ActivityHeader } from '@/components/activity/activity-header'; +import { ActivityOverview } from '@/components/activity/components/activity-overview'; +import { ActivityLogs } from '@/components/activity/activity-logs'; +import { ActivitySkeleton } from '@/components/activity/activity-skeleton'; +import { ActivityError } from '@/components/activity/activity-error'; export function ActivityFeed() { const { activityItemId, filters, filterValues, handleActivitySelect, handleFiltersChange } = useActivityUrlState(); + const { activity, isPending, error } = usePullActivity(activityItemId); + const hasActiveFilters = Object.entries(filters).some(([key, value]) => { // Ignore dateRange as it's always present if (key === 'dateRange') return false; @@ -54,7 +62,7 @@ export function ActivityFeed() { onReset={handleClearFilters} showReset={hasChanges} /> -
+
- + + {isPending ? ( + + ) : error || !activity ? ( + + ) : ( + <> + + + + + )} + diff --git a/apps/dashboard/src/pages/create-subscriber.tsx b/apps/dashboard/src/pages/create-subscriber.tsx index 89f4bc98efd..cb310f1d60e 100644 --- a/apps/dashboard/src/pages/create-subscriber.tsx +++ b/apps/dashboard/src/pages/create-subscriber.tsx @@ -1,25 +1,35 @@ import { Sheet, SheetContent } from '@/components/primitives/sheet'; import { CreateSubscriberForm } from '@/components/subscribers/create-subscriber-form'; import { useOnElementUnmount } from '@/hooks/use-on-element-unmount'; +import { buildRoute, ROUTES } from '@/utils/routes'; import { useRef, useState } from 'react'; -import { useNavigate } from 'react-router-dom'; +import { useNavigate, useParams } from 'react-router-dom'; export function CreateSubscriberPage() { const navigate = useNavigate(); const [open, setOpen] = useState(true); const sheetRef = useRef(null); + const { environmentSlug } = useParams<{ environmentSlug: string }>(); + + const navigateToSubscribersPage = () => { + navigate( + buildRoute(ROUTES.SUBSCRIBERS, { + environmentSlug: environmentSlug ?? '', + }) + ); + }; useOnElementUnmount({ element: sheetRef.current, callback: () => { - navigate(-1); + navigateToSubscribersPage(); }, }); return ( - + - navigate(-1)} /> + ); diff --git a/apps/dashboard/src/pages/edit-subscriber-page.tsx b/apps/dashboard/src/pages/edit-subscriber-page.tsx index fb491cd2376..6ee88ca2903 100644 --- a/apps/dashboard/src/pages/edit-subscriber-page.tsx +++ b/apps/dashboard/src/pages/edit-subscriber-page.tsx @@ -1,10 +1,11 @@ import { SubscriberDrawer } from '@/components/subscribers/subscriber-drawer'; import { useOnElementUnmount } from '@/hooks/use-on-element-unmount'; +import { buildRoute, ROUTES } from '@/utils/routes'; import { useRef, useState } from 'react'; import { useNavigate, useParams } from 'react-router-dom'; export function EditSubscriberPage() { - const { subscriberId } = useParams<{ subscriberId: string }>(); + const { subscriberId, environmentSlug } = useParams<{ subscriberId: string; environmentSlug: string }>(); const navigate = useNavigate(); const [open, setOpen] = useState(true); const sheetRef = useRef(null); @@ -12,7 +13,11 @@ export function EditSubscriberPage() { useOnElementUnmount({ element: sheetRef.current, callback: () => { - navigate(-1); + navigate( + buildRoute(ROUTES.SUBSCRIBERS, { + environmentSlug: environmentSlug ?? '', + }) + ); }, }); diff --git a/apps/dashboard/src/pages/workflows.tsx b/apps/dashboard/src/pages/workflows.tsx index 79858a7a42c..a962acaaf0b 100644 --- a/apps/dashboard/src/pages/workflows.tsx +++ b/apps/dashboard/src/pages/workflows.tsx @@ -45,7 +45,7 @@ export const WorkflowsPage = () => { const navigate = useNavigate(); const [searchParams, setSearchParams] = useSearchParams({ orderDirection: DirectionEnum.DESC, - orderBy: 'updatedAt', + orderBy: 'createdAt', query: '', }); const form = useForm({ diff --git a/apps/dashboard/src/utils/telemetry.ts b/apps/dashboard/src/utils/telemetry.ts index 4e6a6478457..8e80815cf30 100644 --- a/apps/dashboard/src/utils/telemetry.ts +++ b/apps/dashboard/src/utils/telemetry.ts @@ -53,6 +53,10 @@ export enum TelemetryEvent { WORKFLOW_CHECKLIST_STEP_CLICKED = 'Workflow checklist step clicked - [Workflow Editor]', WORKFLOW_INSTRUCTIONS_OPENED = 'Workflow instructions opened - [Workflow Editor]', SUBSCRIBERS_PAGE_VISIT = 'Subscribers page visit', + SUBSCRIBER_CREATED = 'Subscriber created', + SUBSCRIBER_EDITED = 'Subscriber edited', + SUBSCRIBER_DELETED = 'Subscriber deleted', + SUBSCRIBER_PREFERENCES_UPDATED = 'Subscriber preferences updated', SIGN_UP_PAGE_VIEWED = 'Signup page viewed', SIGN_IN_PAGE_VIEWED = 'Signin page viewed', STEP_CONDITIONS_ADDED = 'Step conditions added', diff --git a/apps/worker/src/app/workflow/services/standard.worker.spec.ts b/apps/worker/src/app/workflow/services/standard.worker.spec.ts index 90dfc3e6964..1c681da6f0e 100644 --- a/apps/worker/src/app/workflow/services/standard.worker.spec.ts +++ b/apps/worker/src/app/workflow/services/standard.worker.spec.ts @@ -6,6 +6,7 @@ import { faker } from '@faker-js/faker'; import { setTimeout } from 'timers/promises'; import { + CommunityOrganizationRepository, EnvironmentEntity, JobEntity, JobRepository, @@ -117,6 +118,7 @@ describe('Standard Worker', () => { const workflowInMemoryProviderService = moduleRef.get( WorkflowInMemoryProviderService ); + const organizationRepository = moduleRef.get(CommunityOrganizationRepository); standardWorker = new StandardWorker( handleLastFailedJob, @@ -124,7 +126,8 @@ describe('Standard Worker', () => { setJobAsCompleted, setJobAsFailed, webhookFilterBackoffStrategy, - workflowInMemoryProviderService + workflowInMemoryProviderService, + organizationRepository ); }); diff --git a/apps/worker/src/app/workflow/services/standard.worker.ts b/apps/worker/src/app/workflow/services/standard.worker.ts index a858830ddfa..5b19c5ed9d2 100644 --- a/apps/worker/src/app/workflow/services/standard.worker.ts +++ b/apps/worker/src/app/workflow/services/standard.worker.ts @@ -14,6 +14,7 @@ import { WorkflowInMemoryProviderService, } from '@novu/application-generic'; +import { CommunityOrganizationRepository, CommunityUserRepository } from '@novu/dal'; import { RunJob, RunJobCommand, @@ -40,7 +41,8 @@ export class StandardWorker extends StandardWorkerService { @Inject(forwardRef(() => WebhookFilterBackoffStrategy)) private webhookFilterBackoffStrategy: WebhookFilterBackoffStrategy, @Inject(forwardRef(() => WorkflowInMemoryProviderService)) - public workflowInMemoryProviderService: WorkflowInMemoryProviderService + public workflowInMemoryProviderService: WorkflowInMemoryProviderService, + private organizationRepository: CommunityOrganizationRepository ) { super(new BullMqService(workflowInMemoryProviderService)); @@ -94,6 +96,16 @@ export class StandardWorker extends StandardWorkerService { private getWorkerProcessor() { return async ({ data }: { data: IStandardDataDto }) => { const minimalJobData = this.extractMinimalJobData(data); + const organizationExists = await this.organizationExist(data); + + if (!organizationExists) { + Logger.log( + `Organization not found for organizationId ${minimalJobData.organizationId}. Skipping job.`, + LOG_CONTEXT + ); + + return; + } Logger.verbose(`Job ${minimalJobData.jobId} is being processed in the new instance standard worker`, LOG_CONTEXT); @@ -193,4 +205,12 @@ export class StandardWorker extends StandardWorkerService { }); }; }; + + private async organizationExist(data: IStandardDataDto): Promise { + const { _organizationId } = data; + + const organization = await this.organizationRepository.findOne({ _id: _organizationId }); + + return !!organization; + } } diff --git a/apps/worker/src/app/workflow/services/subscriber-process.worker.ts b/apps/worker/src/app/workflow/services/subscriber-process.worker.ts index 7627d4d8c54..8edc6bb3085 100644 --- a/apps/worker/src/app/workflow/services/subscriber-process.worker.ts +++ b/apps/worker/src/app/workflow/services/subscriber-process.worker.ts @@ -13,6 +13,7 @@ import { IProcessSubscriberDataDto, } from '@novu/application-generic'; +import { CommunityOrganizationRepository } from '@novu/dal'; import { SubscriberJobBound } from '../usecases/subscriber-job-bound/subscriber-job-bound.usecase'; const nr = require('newrelic'); @@ -23,7 +24,8 @@ const LOG_CONTEXT = 'SubscriberProcessWorker'; export class SubscriberProcessWorker extends SubscriberProcessWorkerService { constructor( private subscriberJobBoundUsecase: SubscriberJobBound, - public workflowInMemoryProviderService: WorkflowInMemoryProviderService + public workflowInMemoryProviderService: WorkflowInMemoryProviderService, + private organizationRepository: CommunityOrganizationRepository ) { super(new BullMqService(workflowInMemoryProviderService)); @@ -32,6 +34,14 @@ export class SubscriberProcessWorker extends SubscriberProcessWorkerService { public getWorkerProcessor() { return async ({ data }: { data: IProcessSubscriberDataDto }) => { + const organizationExists = await this.organizationExist(data); + + if (!organizationExists) { + Logger.log(`Organization not found for organizationId ${data.organizationId}. Skipping job.`, LOG_CONTEXT); + + return; + } + return await new Promise((resolve, reject) => { const _this = this; @@ -64,4 +74,12 @@ export class SubscriberProcessWorker extends SubscriberProcessWorkerService { private getWorkerOpts(): WorkerOptions { return getSubscriberProcessWorkerOptions(); } + + private async organizationExist(data: IProcessSubscriberDataDto): Promise { + const { organizationId } = data; + + const organization = await this.organizationRepository.findOne({ _id: organizationId }); + + return !!organization; + } } diff --git a/apps/worker/src/app/workflow/services/workflow.worker.spec.ts b/apps/worker/src/app/workflow/services/workflow.worker.spec.ts index 0d618a7bd06..75ae1f9c319 100644 --- a/apps/worker/src/app/workflow/services/workflow.worker.spec.ts +++ b/apps/worker/src/app/workflow/services/workflow.worker.spec.ts @@ -9,6 +9,7 @@ import { WorkflowQueueService, } from '@novu/application-generic'; +import { CommunityOrganizationRepository } from '@novu/dal'; import { WorkflowWorker } from './workflow.worker'; import { WorkflowModule } from '../workflow.module'; @@ -29,7 +30,9 @@ describe('Workflow Worker', () => { const workflowInMemoryProviderService = moduleRef.get( WorkflowInMemoryProviderService ); - workflowWorker = new WorkflowWorker(triggerEventUseCase, workflowInMemoryProviderService); + const organizationRepository = moduleRef.get(CommunityOrganizationRepository); + + workflowWorker = new WorkflowWorker(triggerEventUseCase, workflowInMemoryProviderService, organizationRepository); workflowQueueService = new WorkflowQueueService(workflowInMemoryProviderService); await workflowQueueService.queue.obliterate(); diff --git a/apps/worker/src/app/workflow/services/workflow.worker.ts b/apps/worker/src/app/workflow/services/workflow.worker.ts index 86bc5319156..78bb1df02cc 100644 --- a/apps/worker/src/app/workflow/services/workflow.worker.ts +++ b/apps/worker/src/app/workflow/services/workflow.worker.ts @@ -12,6 +12,7 @@ import { WorkflowInMemoryProviderService, IWorkflowDataDto, } from '@novu/application-generic'; +import { CommunityOrganizationRepository, CommunityUserRepository } from '@novu/dal'; import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; const nr = require('newrelic'); @@ -22,7 +23,8 @@ const LOG_CONTEXT = 'WorkflowWorker'; export class WorkflowWorker extends WorkflowWorkerService { constructor( private triggerEventUsecase: TriggerEvent, - public workflowInMemoryProviderService: WorkflowInMemoryProviderService + public workflowInMemoryProviderService: WorkflowInMemoryProviderService, + private organizationRepository: CommunityOrganizationRepository ) { super(new BullMqService(workflowInMemoryProviderService)); @@ -35,6 +37,14 @@ export class WorkflowWorker extends WorkflowWorkerService { private getWorkerProcessor(): WorkerProcessor { return async ({ data }: { data: IWorkflowDataDto }) => { + const organizationExists = await this.organizationExist(data); + + if (!organizationExists) { + Logger.log(`Organization not found for organizationId ${data.organizationId}. Skipping job.`, LOG_CONTEXT); + + return; + } + return await new Promise((resolve, reject) => { const _this = this; @@ -63,4 +73,12 @@ export class WorkflowWorker extends WorkflowWorkerService { }); }; } + + private async organizationExist(data: IWorkflowDataDto): Promise { + const { organizationId } = data; + + const organization = await this.organizationRepository.findOne({ _id: organizationId }); + + return !!organization; + } } diff --git a/apps/worker/src/app/workflow/usecases/execute-bridge-job/execute-bridge-job.usecase.ts b/apps/worker/src/app/workflow/usecases/execute-bridge-job/execute-bridge-job.usecase.ts index dac025f16f2..1ce7ef5ea79 100644 --- a/apps/worker/src/app/workflow/usecases/execute-bridge-job/execute-bridge-job.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/execute-bridge-job/execute-bridge-job.usecase.ts @@ -225,6 +225,7 @@ export class ExecuteBridgeJob { message: response.message, code: response.code, data: response.data, + cause: response.cause, }), }; diff --git a/apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts b/apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts index 519c8650e48..04448aae141 100644 --- a/apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts @@ -42,7 +42,9 @@ export class RunJob { }); let job = await this.jobRepository.findOne({ _id: command.jobId, _environmentId: command.environmentId }); - if (!job) throw new PlatformException(`Job with id ${command.jobId} not found`); + if (!job) { + throw new PlatformException(`Job with id ${command.jobId} not found`); + } this.assignLogger(job); @@ -110,6 +112,15 @@ export class RunJob { await this.jobRepository.updateStatus(job._environmentId, job._id, JobStatusEnum.COMPLETED); } } catch (error: any) { + if (shouldHaltOnStepFailure(job) && !this.shouldBackoff(error)) { + await this.jobRepository.cancelPendingJobs({ + transactionId: job.transactionId, + _environmentId: job._environmentId, + _subscriberId: job._subscriberId, + _templateId: job._templateId, + }); + } + if (shouldHaltOnStepFailure(job) || this.shouldBackoff(error)) { shouldQueueNextJob = false; } @@ -136,9 +147,9 @@ export class RunJob { return; } - let shouldContinue = true; + let shouldContinueQueueNextJob = true; - while (shouldContinue) { + while (shouldContinueQueueNextJob) { try { if (!currentFailedJob) { return; @@ -161,7 +172,7 @@ export class RunJob { job: nextJob, }); - shouldContinue = false; + shouldContinueQueueNextJob = false; } catch (error: any) { if (!nextJob) { return; @@ -177,8 +188,17 @@ export class RunJob { error ); + if (shouldHaltOnStepFailure(nextJob) && !this.shouldBackoff(error)) { + await this.jobRepository.cancelPendingJobs({ + transactionId: nextJob.transactionId, + _environmentId: nextJob._environmentId, + _subscriberId: nextJob._subscriberId, + _templateId: nextJob._templateId, + }); + } + if (shouldHaltOnStepFailure(nextJob) || this.shouldBackoff(error)) { - shouldContinue = false; + shouldContinueQueueNextJob = false; throw error; } diff --git a/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts index 271ea37130b..59abb3d130d 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts @@ -95,7 +95,7 @@ export class SendMessage { const stepType = command.step?.template?.type; let bridgeResponse: ExecuteOutput | null = null; - if (![StepTypeEnum.DIGEST, StepTypeEnum.DELAY, StepTypeEnum.TRIGGER].includes(stepType as StepTypeEnum)) { + if (isChannelStep(stepType)) { bridgeResponse = await this.executeBridgeJob.execute({ ...command, variables, @@ -502,3 +502,7 @@ export class SendMessage { return tenant; } } + +function isChannelStep(stepType: StepTypeEnum | undefined) { + return ![StepTypeEnum.DIGEST, StepTypeEnum.DELAY, StepTypeEnum.TRIGGER].includes(stepType as StepTypeEnum); +} diff --git a/eslint.config.mjs b/eslint.config.mjs index 361bb97d41b..ff2507773df 100644 --- a/eslint.config.mjs +++ b/eslint.config.mjs @@ -115,6 +115,7 @@ export default tsEslint.config( }, rules: { + '@typescript-eslint/await-thenable': 'warn', 'unused-imports/no-unused-imports': 'off', '@typescript-eslint/space-before-blocks': 'off', '@typescript-eslint/lines-between-class-members': 'off', diff --git a/libs/application-generic/src/services/analytics.service.ts b/libs/application-generic/src/services/analytics.service.ts index 8ff4f31bacb..33bed26b239 100644 --- a/libs/application-generic/src/services/analytics.service.ts +++ b/libs/application-generic/src/services/analytics.service.ts @@ -71,6 +71,22 @@ export class AnalyticsService { }); } + updateGroup( + userId: string, + groupId: string, + traits: Record, + ) { + if (!this.segmentEnabled) { + return; + } + + this.segment.group({ + userId, + groupId, + traits, + }); + } + alias(distinctId: string, userId: string) { if (!this.segmentEnabled) { return; diff --git a/libs/application-generic/src/usecases/execute-bridge-request/execute-bridge-request.usecase.ts b/libs/application-generic/src/usecases/execute-bridge-request/execute-bridge-request.usecase.ts index 52ae4d1d1d6..dec09cdeab4 100644 --- a/libs/application-generic/src/usecases/execute-bridge-request/execute-bridge-request.usecase.ts +++ b/libs/application-generic/src/usecases/execute-bridge-request/execute-bridge-request.usecase.ts @@ -41,30 +41,36 @@ import { BRIDGE_EXECUTION_ERROR } from '../../utils'; import { HttpRequestHeaderKeysEnum } from '../../http'; import { Instrument, InstrumentUsecase } from '../../instrumentation'; +const isTestEnv = process.env.NODE_ENV === 'test'; + export const DEFAULT_TIMEOUT = 5_000; // 5 seconds export const DEFAULT_RETRIES_LIMIT = 3; -export const RETRYABLE_HTTP_CODES: number[] = [ - 408, // Request Timeout - 429, // Too Many Requests - 500, // Internal Server Error - 503, // Service Unavailable - 504, // Gateway Timeout - // https://developers.cloudflare.com/support/troubleshooting/cloudflare-errors/troubleshooting-cloudflare-5xx-errors/ - 521, // CloudFlare web server is down - 522, // CloudFlare connection timed out - 524, // CloudFlare a timeout occurred -]; -const RETRYABLE_ERROR_CODES: string[] = [ - 'EAI_AGAIN', // DNS resolution failed, retry - 'ECONNREFUSED', // Connection refused by the server - 'ECONNRESET', // Connection was forcibly closed by a peer - 'EADDRINUSE', // Address already in use - 'EPIPE', // Broken pipe - 'ETIMEDOUT', // Operation timed out - 'ENOTFOUND', // DNS lookup failed - 'EHOSTUNREACH', // No route to host - 'ENETUNREACH', // Network is unreachable -]; +export const RETRYABLE_HTTP_CODES: number[] = isTestEnv + ? [] + : [ + 408, // Request Timeout + 429, // Too Many Requests + 500, // Internal Server Error + 503, // Service Unavailable + 504, // Gateway Timeout + // https://developers.cloudflare.com/support/troubleshooting/cloudflare-errors/troubleshooting-cloudflare-5xx-errors/ + 521, // CloudFlare web server is down + 522, // CloudFlare connection timed out + 524, // CloudFlare a timeout occurred + ]; +export const RETRYABLE_ERROR_CODES: string[] = isTestEnv + ? [] + : [ + 'EAI_AGAIN', // DNS resolution failed, retry + 'ECONNREFUSED', // Connection refused by the server + 'ECONNRESET', // Connection was forcibly closed by a peer + 'EADDRINUSE', // Address already in use + 'EPIPE', // Broken pipe + 'ETIMEDOUT', // Operation timed out + 'ENOTFOUND', // DNS lookup failed + 'EHOSTUNREACH', // No route to host + 'ENETUNREACH', // Network is unreachable + ]; const LOG_CONTEXT = 'ExecuteBridgeRequest'; @@ -144,25 +150,52 @@ export class ExecuteBridgeRequest { timeout: DEFAULT_TIMEOUT, json: command.event, retry: { - calculateDelay: ({ attemptCount, computedValue }) => { - if (computedValue === 0) { - /* - * If the computed value is 0, the retry conditions were not met and we don't want to retry. - * The retry condition is only met when the response has a `statusCodes` or `errorCodes` - * that matches the supplied retry configuration values. - * @see https://github.com/sindresorhus/got/blob/3034c2fdcebdff94907a6e015a8b154e851fc343/documentation/7-retry.md?plain=1#L130 - */ + limit: retriesLimit, + methods: ['GET', 'POST'], + statusCodes: RETRYABLE_HTTP_CODES, + errorCodes: RETRYABLE_ERROR_CODES, + calculateDelay: ({ attemptCount, error }) => { + if (attemptCount > retriesLimit) { + Logger.log( + `Exceeded retry limit of ${retriesLimit}. Stopping retries.`, + LOG_CONTEXT, + ); + return 0; } - if (attemptCount === retriesLimit) { - return 0; + // Check if the error status code is in our retryable codes + if ( + error?.response?.statusCode && + RETRYABLE_HTTP_CODES.includes(error.response.statusCode) + ) { + const delay = 2 ** attemptCount * 1000; + Logger.log( + `Retryable status code ${error.response.statusCode} detected. Retrying in ${delay}ms`, + LOG_CONTEXT, + ); + + return delay; } - return 2 ** attemptCount * 1000; + // Check if the error code is in our retryable error codes + if (error?.code && RETRYABLE_ERROR_CODES.includes(error.code)) { + const delay = 2 ** attemptCount * 1000; + Logger.log( + `Retryable error code ${error.code} detected. Retrying in ${delay}ms`, + LOG_CONTEXT, + ); + + return delay; + } + + Logger.log( + 'Error is not retryable. Stopping retry attempts.', + LOG_CONTEXT, + ); + + return 0; // Don't retry for other errors }, - statusCodes: RETRYABLE_HTTP_CODES, - errorCodes: RETRYABLE_ERROR_CODES, }, https: { /* @@ -460,7 +493,6 @@ export class ExecuteBridgeRequest { BRIDGE_EXECUTION_ERROR.UNKNOWN_BRIDGE_REQUEST_ERROR.message(url), code: BRIDGE_EXECUTION_ERROR.UNKNOWN_BRIDGE_REQUEST_ERROR.code, statusCode: HttpStatus.INTERNAL_SERVER_ERROR, - cause: error, }; } } else { @@ -474,12 +506,12 @@ export class ExecuteBridgeRequest { BRIDGE_EXECUTION_ERROR.UNKNOWN_BRIDGE_NON_REQUEST_ERROR.message(url), code: BRIDGE_EXECUTION_ERROR.UNKNOWN_BRIDGE_NON_REQUEST_ERROR.code, statusCode: HttpStatus.INTERNAL_SERVER_ERROR, - cause: error, }; } const fullBridgeError: BridgeError = { ...bridgeErrorData, + cause: error, url, }; diff --git a/libs/dal/src/repositories/job/job.repository.ts b/libs/dal/src/repositories/job/job.repository.ts index f295bc9a9a1..f5555ea8adb 100644 --- a/libs/dal/src/repositories/job/job.repository.ts +++ b/libs/dal/src/repositories/job/job.repository.ts @@ -328,4 +328,31 @@ export class JobRepository extends BaseRepository { + return this.MongooseModel.updateMany( + { + _environmentId, + _subscriberId, + _templateId, + status: JobStatusEnum.PENDING, + transactionId, + }, + { + $set: { + status: JobStatusEnum.CANCELED, + }, + } + ); + } } diff --git a/libs/dal/src/repositories/subscriber/subscriber.repository.ts b/libs/dal/src/repositories/subscriber/subscriber.repository.ts index 02d4f40073f..e69016db8a0 100644 --- a/libs/dal/src/repositories/subscriber/subscriber.repository.ts +++ b/libs/dal/src/repositories/subscriber/subscriber.repository.ts @@ -212,21 +212,37 @@ export class SubscriberRepository extends BaseRepository { + try { + await job.promote(); + } catch (error) { + // Silently handle promotion failures since job may have already executed + } +}; + export class JobsService { private jobRepository = new JobRepository(); @@ -48,7 +56,7 @@ export class JobsService { unfinishedJobs = 0, }: { templateId?: string | string[]; - organizationId: string; + organizationId?: string; delay?: boolean; unfinishedJobs?: number; }) { @@ -72,7 +80,7 @@ export class JobsService { // Wait until there are no pending, queued or running jobs in Mongo runningJobs = Math.max( await this.jobRepository.count({ - _organizationId: organizationId, + ...((organizationId ? { _organizationId: organizationId } : {}) as { _organizationId: string }), ...typeMatch, ...workflowMatch, status: { @@ -86,7 +94,39 @@ export class JobsService { public async runAllDelayedJobsImmediately() { const delayedJobs = await this.standardQueue.getDelayed(); - await delayedJobs.forEach(async (job) => job.promote()); + await Promise.all(delayedJobs.map((job) => job.promote())); + } + + public async awaitAllJobs() { + let hasMoreDelayedJobs = true; + let iterationCount = 0; + const MAX_ITERATIONS = 20; + await this.waitForJobCompletion({}); + + while (hasMoreDelayedJobs && iterationCount < MAX_ITERATIONS) { + const jobsResult = await Promise.all([ + this.standardQueue.getDelayed(), + this.workflowQueue.getDelayed(), + this.subscriberProcessQueue.getDelayed(), + ]); + const jobs = jobsResult.flat(); + + if (jobs.length === 0) { + hasMoreDelayedJobs = false; + continue; + } + + await Promise.all(jobs.map(promote)); + await this.waitForJobCompletion({}); + iterationCount += 1; + } + + if (iterationCount >= MAX_ITERATIONS) { + // eslint-disable-next-line no-console + console.warn( + 'Max iterations reached while processing delayed jobs. This might indicate an infinite loop in job creation.' + ); + } } private async getQueueMetric() { diff --git a/package.json b/package.json index 29d76a08541..12286dbcbd3 100644 --- a/package.json +++ b/package.json @@ -53,7 +53,6 @@ "start:docker:embed": "cd libs/embed && npm run start:docker", "start:docker:web": "cross-env nx run-many --target=start:docker --projects=@novu/web", "start:docker:widget": "cross-env nx run-many --target=start:docker --projects=@novu/widget", - "start:e2e:api": "cd apps/api && pnpm run test:e2e", "start:integration:api": "cd apps/api && pnpm run test", "start:node": "cross-env nx run @novu/node:start", "start:notification-center": "cross-env nx run @novu/notification-center:start", diff --git a/packages/shared/src/entities/activity-feed/activity.interface.ts b/packages/shared/src/entities/activity-feed/activity.interface.ts index eaef194b2f5..ada1e610fa9 100644 --- a/packages/shared/src/entities/activity-feed/activity.interface.ts +++ b/packages/shared/src/entities/activity-feed/activity.interface.ts @@ -24,7 +24,7 @@ export interface IActivity { tags: string[]; createdAt: string; updatedAt: string; - template: Pick; - subscriber: Pick; + template?: Pick; + subscriber?: Pick; jobs: IActivityJob[]; } diff --git a/packages/shared/src/types/feature-flags.ts b/packages/shared/src/types/feature-flags.ts index 158b77874b5..4e85493160b 100644 --- a/packages/shared/src/types/feature-flags.ts +++ b/packages/shared/src/types/feature-flags.ts @@ -52,6 +52,8 @@ export enum FeatureFlagsKeysEnum { IS_SUBSCRIBERS_PAGE_ENABLED = 'IS_SUBSCRIBERS_PAGE_ENABLED', IS_WORKFLOW_CHECK_LIST_ENABLED = 'IS_WORKFLOW_CHECK_LIST_ENABLED', IS_TIER_DURATION_RESTRICTION_EXCLUDED_ENABLED = 'IS_TIER_DURATION_RESTRICTION_EXCLUDED_ENABLED', + IS_2025_Q1_TIERING_ENABLED = 'IS_2025_Q1_TIERING_ENABLED', + IS_LEGACY_TO_2025_Q1_TIER_MIGRATION_ENABLED = 'IS_LEGACY_TO_2025_Q1_TIER_MIGRATION_ENABLED', IS_MAX_STEPS_PER_WORKFLOW_ENABLED = 'IS_MAX_STEPS_PER_WORKFLOW_ENABLED', MAX_WORKFLOW_LIMIT_NUMBER = 'MAX_WORKFLOW_LIMIT_NUMBER', } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index eeeff5bc3ed..a592b0a0be6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -86140,4 +86140,4 @@ snapshots: zwitch@2.0.4: {} - zxcvbn@4.4.2: {} + zxcvbn@4.4.2: {} \ No newline at end of file