Skip to content

Commit

Permalink
Merge branch 'next' into nv-5161-implement-workflow-and-step-limits
Browse files Browse the repository at this point in the history
# Conflicts:
#	.source
#	packages/shared/src/types/feature-flags.ts
  • Loading branch information
djabarovgeorge committed Feb 13, 2025
2 parents cf785ca + c6ae39e commit 6bd2664
Show file tree
Hide file tree
Showing 64 changed files with 1,259 additions and 539 deletions.
20 changes: 20 additions & 0 deletions .github/workflows/check-only.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: Check for .only flags

on:
pull_request:
branches: [ "**" ]

jobs:
check-only:
name: Check for .only in tests
runs-on: ubuntu-latest
timeout-minutes: 5

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Check for .only flags
run: |
chmod +x .github/workflows/scripts/stop-only.sh
.github/workflows/scripts/stop-only.sh .
29 changes: 29 additions & 0 deletions .github/workflows/scripts/stop-only.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/bash

# Define the search directory (default to current directory)
SEARCH_DIR=${1:-.}

# Find all matching test files and search for ".only"
echo "🔍 Searching for '.only' in test files"

# Search for .only patterns and store results
FOUND_FILES=$(grep -r "it.only\|describe.only\|test.only" "$SEARCH_DIR" \
--include="*.e2e.ts" \
--include="*.e2e-ee.ts" \
--include="*.spec.ts" \
--include="*.test.ts" \
--exclude-dir={node_modules,dist,build} \
-n | cut -d ":" -f 1,2)

# Check if any files were found
if [ -n "$FOUND_FILES" ]; then
echo ""
echo "🥵 Found '.only' in the following files:"
echo "$FOUND_FILES"
echo ""
echo "🧹🧹🧹 Please remove '.only' before committing!"
exit 1
else
echo "✅ No '.only' found in test files."
exit 0
fi
60 changes: 52 additions & 8 deletions apps/api/e2e/setup.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@
import { testServer } from '@novu/testing';
import { testServer, TestingQueueService, JobsService } from '@novu/testing';
import sinon from 'sinon';
import chai from 'chai';
import { default as mongoose } from 'mongoose';
import mongoose from 'mongoose';
import { JobRepository } from '@novu/dal';
import { JobTopicNameEnum } from '@novu/shared';
import { bootstrap } from '../src/bootstrap';

const jobRepository = new JobRepository();
const workflowQueue = new TestingQueueService(JobTopicNameEnum.WORKFLOW).queue;
const standardQueue = new TestingQueueService(JobTopicNameEnum.STANDARD).queue;
const subscriberProcessQueue = new TestingQueueService(JobTopicNameEnum.PROCESS_SUBSCRIBER).queue;

let connection: typeof mongoose;

async function getConnection() {
if (!connection) {
connection = await mongoose.connect(process.env.MONGO_URL);
}

return connection;
}

async function dropDatabase() {
try {
await mongoose.connect(process.env.MONGO_URL);
await mongoose.connection.db.dropDatabase();
const conn = await getConnection();
await conn.connection.db.dropDatabase();
} catch (error) {
console.error('Error dropping the database:', error);
} finally {
await mongoose.disconnect();
}
}

Expand All @@ -27,9 +42,38 @@ before(async () => {
after(async () => {
await testServer.teardown();
await dropDatabase();
if (connection) {
await connection.disconnect();
}
});

// TODO: Remove this
afterEach(() => {
async function cleanup() {
const jobsService = new JobsService();
await jobsService.runAllDelayedJobsImmediately();
await jobsService.awaitAllJobs();

await Promise.all([workflowQueue.drain(), standardQueue.drain(), subscriberProcessQueue.drain()]);

await jobRepository._model.deleteMany({});
}

function timeoutPromise(ms: number) {
// eslint-disable-next-line no-promise-executor-return
return new Promise((resolve) => setTimeout(resolve, ms));
}

afterEach(async function () {
const TIMEOUT = 4500;
sinon.restore();

try {
await Promise.race([
cleanup(),
timeoutPromise(TIMEOUT).then(() => {
console.warn('Cleanup operation timed out after 5000ms - continuing with tests');
}),
]);
} catch (error) {
console.error('Error during cleanup:', error);
}
});
6 changes: 6 additions & 0 deletions apps/api/src/app/analytics/analytics.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ export class AnalyticsController {
jobTitle: body.jobTitle,
});

this.analyticsService.updateGroup(user._id, user.organizationId, {
organizationType: body.organizationType,
companySize: body.companySize,
jobTitle: body.jobTitle,
});

await this.hubspotIdentifyFormUsecase.execute(
HubspotIdentifyFormCommand.create({
email: user.email as string,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ const verifyCustomerMock = {
],
};

describe('webhook event - customer.subscription.deleted #novu-v2', () => {
describe.skip('webhook event - customer.subscription.deleted #novu-v2', () => {
const eeBilling = require('@novu/ee-billing');
if (!eeBilling) {
throw new Error('ee-billing does not exist');
Expand Down
15 changes: 13 additions & 2 deletions apps/api/src/app/billing/e2e/get-prices.e2e-ee.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@ describe('GetPrices #novu-v2', () => {
},
};
let listPricesStub: sinon.SinonStub;
let getFeatureFlagStub: { execute: sinon.SinonStub };
const IS_2025_Q1_TIERING_ENABLED = true;

beforeEach(() => {
getFeatureFlagStub = {
execute: sinon.stub().resolves(IS_2025_Q1_TIERING_ENABLED),
};

listPricesStub = stripeStub.prices.list;
listPricesStub.onFirstCall().resolves({
data: [{ id: 'licensed_price_id_1' }],
Expand All @@ -32,17 +38,22 @@ describe('GetPrices #novu-v2', () => {

afterEach(() => {
listPricesStub.reset();
getFeatureFlagStub.execute.reset();
});

const createUseCase = () => new GetPrices(stripeStub as any);
const createUseCase = () => new GetPrices(stripeStub, getFeatureFlagStub);

const freeMeteredPriceLookupKey = IS_2025_Q1_TIERING_ENABLED
? ['free_usage_notifications_10k']
: ['free_usage_notifications'];

const expectedPrices = [
{
apiServiceLevel: ApiServiceLevelEnum.FREE,
billingInterval: StripeBillingIntervalEnum.MONTH,
prices: {
licensed: ['free_flat_monthly'],
metered: ['free_usage_notifications'],
metered: freeMeteredPriceLookupKey,
},
},
{
Expand Down
49 changes: 45 additions & 4 deletions apps/api/src/app/events/e2e/bridge-trigger.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import axios from 'axios';
import { expect } from 'chai';
import sinon from 'sinon';

import { SubscribersService, UserSession } from '@novu/testing';
import { JobsService, SubscribersService, TestingQueueService, UserSession } from '@novu/testing';
import {
ExecutionDetailsRepository,
JobRepository,
Expand All @@ -15,10 +15,10 @@ import {
CreateWorkflowDto,
ExecutionDetailsStatusEnum,
JobStatusEnum,
JobTopicNameEnum,
MessagesStatusEnum,
StepTypeEnum,
WorkflowCreationSourceEnum,
WorkflowOriginEnum,
WorkflowResponseDto,
} from '@novu/shared';
import { workflow } from '@novu/framework';
Expand All @@ -44,8 +44,21 @@ contexts.forEach((context: Context) => {
let subscriber: SubscriberEntity;
let subscriberService: SubscribersService;
const executionDetailsRepository = new ExecutionDetailsRepository();
const jobsService = new JobsService();
let bridge;

const printJobsState = async (prefix: string) => {
const count = await Promise.all([
jobRepository.count({} as any),
new TestingQueueService(JobTopicNameEnum.WORKFLOW).queue.getWaitingCount(),
new TestingQueueService(JobTopicNameEnum.PROCESS_SUBSCRIBER).queue.getWaitingCount(),
new TestingQueueService(JobTopicNameEnum.STANDARD).queue.getWaitingCount(),
]);

// eslint-disable-next-line no-console
console.log(`${prefix} Jobs state `, count);
};

beforeEach(async () => {
bridgeServer = new BridgeServer();
bridge = context.isStateful ? undefined : { url: `${bridgeServer.serverPath}/novu` };
Expand Down Expand Up @@ -564,8 +577,17 @@ contexts.forEach((context: Context) => {
await discoverAndSyncBridge(session, workflowsRepository, workflowId, bridgeServer);
}

await printJobsState('before triggerEvent');
await triggerEvent(session, workflowId, subscriber.subscriberId, {}, bridge);
await printJobsState('after triggerEvent');

await session.runAllDelayedJobsImmediately();
await printJobsState('after runAllDelayedJobsImmediately');

await session.waitForJobCompletion();
await printJobsState('after waitForJobCompletion');

await session.runAllDelayedJobsImmediately();
await session.waitForJobCompletion();

const messagesAfter = await messageRepository.find({
Expand Down Expand Up @@ -598,7 +620,7 @@ contexts.forEach((context: Context) => {
await bridgeServer.start({ workflows: [exceedMaxTierDurationWorkflow] });

if (context.isStateful) {
await discoverAndSyncBridge(session, workflowsRepository, workflowId, bridgeServer);
await discoverAndSyncBridge(session, workflowsRepository, exceedMaxTierDurationWorkflowId, bridgeServer);
}

const result = await triggerEvent(session, exceedMaxTierDurationWorkflowId, subscriber.subscriberId, {}, bridge);
Expand All @@ -615,6 +637,7 @@ contexts.forEach((context: Context) => {
});

it(`should trigger the bridge workflow with control default and payload data [${context.name}]`, async () => {
await printJobsState('test init');
const workflowId = `default-payload-params-workflow-${`${context.name}`}`;
const newWorkflow = workflow(
workflowId,
Expand Down Expand Up @@ -655,10 +678,28 @@ contexts.forEach((context: Context) => {
await discoverAndSyncBridge(session, workflowsRepository, workflowId, bridgeServer);
}

await printJobsState('before trigger 1');

await triggerEvent(session, workflowId, subscriber.subscriberId, {}, bridge);

await session.waitForJobCompletion();

await printJobsState('after trigger 1');

await triggerEvent(session, workflowId, subscriber.subscriberId, { name: 'payload_name' }, bridge);
await session.waitForJobCompletion();

await printJobsState('after trigger 2');

await jobsService.awaitAllJobs();

await printJobsState('before sleep');

// eslint-disable-next-line no-promise-executor-return
await new Promise((resolve) => setTimeout(resolve, 100));

await printJobsState('after sleep');

await jobsService.awaitAllJobs();

const sentMessage = await messageRepository.find({
_environmentId: session.environment._id,
Expand Down
5 changes: 3 additions & 2 deletions apps/api/src/app/events/e2e/digest-events.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
JobEntity,
} from '@novu/dal';
import { StepTypeEnum, DigestTypeEnum, DigestUnitEnum, IDigestRegularMetadata } from '@novu/shared';
import { UserSession, SubscribersService } from '@novu/testing';
import { UserSession, SubscribersService, JobsService } from '@novu/testing';

const axiosInstance = axios.create();

Expand All @@ -26,6 +26,7 @@ describe('Trigger event - Digest triggered events - /v1/events/trigger (POST) #n
let subscriberService: SubscribersService;
const jobRepository = new JobRepository();
const messageRepository = new MessageRepository();
const jobsService = new JobsService();

const triggerEvent = async (payload, transactionId?: string): Promise<void> => {
await axiosInstance.post(
Expand Down Expand Up @@ -255,7 +256,7 @@ describe('Trigger event - Digest triggered events - /v1/events/trigger (POST) #n
const mergedJobs = jobs.filter((elem) => elem.status !== JobStatusEnum.DELAYED);
expect(mergedJobs && mergedJobs.length).to.eql(1);

await session.waitForJobCompletion(template?._id, false, 1);
await jobsService.awaitAllJobs();

const finalJobs = await jobRepository.find({
_environmentId: session.environment._id,
Expand Down
20 changes: 19 additions & 1 deletion apps/api/src/app/events/e2e/trigger-event.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
SubscriberRepository,
TenantRepository,
} from '@novu/dal';
import { SubscribersService, UserSession, WorkflowOverrideService } from '@novu/testing';
import { SubscribersService, TestingQueueService, UserSession, WorkflowOverrideService } from '@novu/testing';
import {
ActorTypeEnum,
ChannelTypeEnum,
Expand All @@ -30,6 +30,7 @@ import {
FilterPartTypeEnum,
IEmailBlock,
InAppProviderIdEnum,
JobTopicNameEnum,
PreviousStepTypeEnum,
SmsProviderIdEnum,
StepTypeEnum,
Expand Down Expand Up @@ -68,6 +69,18 @@ describe('Trigger event - /v1/events/trigger (POST) #novu-v2', function () {
const tenantRepository = new TenantRepository();
let novuClient: Novu;

const printJobsState = async (prefix: string) => {
const count = await Promise.all([
jobRepository.count({} as any),
new TestingQueueService(JobTopicNameEnum.WORKFLOW).queue.getWaitingCount(),
new TestingQueueService(JobTopicNameEnum.PROCESS_SUBSCRIBER).queue.getWaitingCount(),
new TestingQueueService(JobTopicNameEnum.STANDARD).queue.getWaitingCount(),
]);

// eslint-disable-next-line no-console
console.log(`${prefix} Jobs state `, count);
};

beforeEach(async () => {
session = new UserSession();
await session.initialize();
Expand Down Expand Up @@ -126,6 +139,7 @@ describe('Trigger event - /v1/events/trigger (POST) #novu-v2', function () {
],
});

await printJobsState('before triggerEvent');
await novuClient.trigger({
workflowId: template.triggers[0].identifier,
to: [subscriber.subscriberId],
Expand All @@ -134,8 +148,12 @@ describe('Trigger event - /v1/events/trigger (POST) #novu-v2', function () {
},
});

await printJobsState('after triggerEvent');

await session.waitForJobCompletion(template?._id, true, 0);

await printJobsState('after waitForJobCompletion');

const messagesAfter = await messageRepository.find({
_environmentId: session.environment._id,
_subscriberId: subscriber._id,
Expand Down
Loading

0 comments on commit 6bd2664

Please sign in to comment.