Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Jan 30, 2025
1 parent e2bcf25 commit 6e10c5a
Show file tree
Hide file tree
Showing 21 changed files with 123 additions and 84 deletions.
2 changes: 2 additions & 0 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,8 @@ export class ChangeStream<
this.isClosed = false;
this.mode = false;

this.on('error', () => null);

// Listen for any `change` listeners being added to ChangeStream
this.on('newListener', eventName => {
if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
Expand Down
3 changes: 2 additions & 1 deletion src/cmap/auth/mongodb_oidc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ export interface Workflow {
execute(
connection: Connection,
credentials: MongoCredentials,
response?: Document
response?: Document,
closeSignal: AbortSignal
): Promise<void>;

/**
Expand Down
3 changes: 2 additions & 1 deletion src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
MongoRuntimeError,
needsRetryableWriteLabel
} from '../error';
import { addAbortSignalToStream, HostAddress, ns, promiseWithResolvers } from '../utils';
import { addAbortSignalToStream, HostAddress, noop, ns, promiseWithResolvers } from '../utils';
import { AuthContext } from './auth/auth_provider';
import { AuthMechanism } from './auth/providers';
import {
Expand Down Expand Up @@ -389,6 +389,7 @@ export async function makeSocket(

addAbortSignalToStream(closeSignal, socket);

socket.unref();
socket.setKeepAlive(true, 300000);
socket.setTimeout(connectTimeoutMS);
socket.setNoDelay(noDelay);
Expand Down
8 changes: 8 additions & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,14 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
);
}

unref() {
this.socket.unref();
}

ref() {
this.socket.ref();
}

public markAvailable(): void {
this.lastUseTime = now();
}
Expand Down
3 changes: 3 additions & 0 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
if (!this.checkedOut.has(connection)) {
return;
}

connection.unref();
const poolClosed = this.closed;
const stale = this.connectionIsStale(connection);
const willDestroy = !!(poolClosed || stale || connection.closed);
Expand Down Expand Up @@ -788,6 +790,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
);

this.waitQueue.shift();
connection.ref();
waitQueueMember.resolve(connection);
}
}
Expand Down
22 changes: 8 additions & 14 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -384,20 +384,13 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
}

// connecting does an implicit `hello`
(async () => {
const makeMonitoringConnection = async () => {
const socket = await makeSocket(monitor.connectOptions, monitor.closeSignal);
const connection = makeConnection(monitor.connectOptions, socket);
// The start time is after socket creation but before the handshake
start = now();
try {
await performInitialHandshake(connection, monitor.connectOptions, monitor.closeSignal);
return connection;
} catch (error) {
connection.destroy();
throw error;
}
})().then(
connection => {
if (isInCloseState(monitor)) {
connection.destroy();
return;
Expand All @@ -417,15 +410,16 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
useStreamingProtocol(monitor, connection.hello?.topologyVersion)
)
);

callback(undefined, connection.hello);
},
error => {
return connection.hello;
} catch (error) {
connection.destroy();
monitor.connection = null;
awaited = false;
onHeartbeatFailed(error);
throw error;
}
);
};

makeMonitoringConnection().then(callback.bind(undefined, undefined), onHeartbeatFailed);
}

function monitorServer(monitor: Monitor) {
Expand Down
11 changes: 10 additions & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,16 @@ export function addAbortSignalToStream(

const abortListener = addAbortListener(signal, function () {
stream.off('close', abortListener[kDispose]).off('error', abortListener[kDispose]);
stream.destroy(this.reason);
stream.destroy(
new (class extends Error {
s = stream;
})(
`sad: ${stream.___socketId}: error listeners: ${stream.listenerCount('error')} + ${stream.___stack}`,
{
cause: this.reason
}
)
);
});
// not nearly as complex as node's eos() but... do we need all that?? sobbing emoji.
stream.once('close', abortListener[kDispose]).once('error', abortListener[kDispose]);
Expand Down
16 changes: 14 additions & 2 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
MongoChangeStreamError,
type MongoClient,
MongoServerError,
promiseWithResolvers,
ReadPreference,
type ResumeToken
} from '../../mongodb';
Expand Down Expand Up @@ -62,6 +63,7 @@ describe('Change Streams', function () {
await csDb.createCollection('test').catch(() => null);
collection = csDb.collection('test');
changeStream = collection.watch();
changeStream.once('error', error => this.error(error));
});

afterEach(async () => {
Expand Down Expand Up @@ -695,10 +697,18 @@ describe('Change Streams', function () {
async test() {
await initIteratorMode(changeStream);

const { promise, resolve, reject } = promiseWithResolvers<void>();

const outStream = new PassThrough({ objectMode: true });

// @ts-expect-error: transform requires a Document return type
changeStream.stream({ transform: JSON.stringify }).pipe(outStream);
const csStream = changeStream
// @ts-expect-error: transform requires a Document return type
.stream({ transform: JSON.stringify });

csStream.once('error', reject).pipe(outStream).once('error', reject);

outStream.on('close', resolve);
csStream.on('close', resolve);

const willBeData = once(outStream, 'data');

Expand All @@ -709,6 +719,8 @@ describe('Change Streams', function () {
expect(parsedEvent).to.have.nested.property('fullDocument.a', 1);

outStream.destroy();
csStream.destroy();
await promise;
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const metadata: MongoDBMetadataUI = {
}
};

const closeSignal = new AbortController().signal;

context('Azure KMS Mock Server Tests', function () {
context('Case 1: Success', metadata, function () {
// Do not set an ``X-MongoDB-HTTP-TestParams`` header.
Expand All @@ -44,7 +46,7 @@ context('Azure KMS Mock Server Tests', function () {
// 5. The token will have a resource of ``"https://vault.azure.net"``

it('returns a properly formatted access token', async () => {
const credentials = await fetchAzureKMSToken(new KMSRequestOptions());
const credentials = await fetchAzureKMSToken(new KMSRequestOptions(), closeSignal);
expect(credentials).to.have.property('accessToken', 'magic-cookie');
});
});
Expand All @@ -59,7 +61,10 @@ context('Azure KMS Mock Server Tests', function () {
// The test case should ensure that this error condition is handled gracefully.

it('returns an error', async () => {
const error = await fetchAzureKMSToken(new KMSRequestOptions('empty-json')).catch(e => e);
const error = await fetchAzureKMSToken(
new KMSRequestOptions('empty-json'),
closeSignal
).catch(e => e);

expect(error).to.be.instanceof(MongoCryptAzureKMSRequestError);
});
Expand All @@ -74,7 +79,9 @@ context('Azure KMS Mock Server Tests', function () {
// The test case should ensure that this error condition is handled gracefully.

it('returns an error', async () => {
const error = await fetchAzureKMSToken(new KMSRequestOptions('bad-json')).catch(e => e);
const error = await fetchAzureKMSToken(new KMSRequestOptions('bad-json'), closeSignal).catch(
e => e
);

expect(error).to.be.instanceof(MongoCryptAzureKMSRequestError);
});
Expand All @@ -89,7 +96,9 @@ context('Azure KMS Mock Server Tests', function () {
// 2. The response body is unspecified.
// The test case should ensure that this error condition is handled gracefully.
it('returns an error', async () => {
const error = await fetchAzureKMSToken(new KMSRequestOptions('404')).catch(e => e);
const error = await fetchAzureKMSToken(new KMSRequestOptions('404'), closeSignal).catch(
e => e
);

expect(error).to.be.instanceof(MongoCryptAzureKMSRequestError);
});
Expand All @@ -104,7 +113,9 @@ context('Azure KMS Mock Server Tests', function () {
// 2. The response body is unspecified.
// The test case should ensure that this error condition is handled gracefully.
it('returns an error', async () => {
const error = await fetchAzureKMSToken(new KMSRequestOptions('500')).catch(e => e);
const error = await fetchAzureKMSToken(new KMSRequestOptions('500'), closeSignal).catch(
e => e
);

expect(error).to.be.instanceof(MongoCryptAzureKMSRequestError);
});
Expand All @@ -117,7 +128,9 @@ context('Azure KMS Mock Server Tests', function () {
// The HTTP response from the ``fake_azure`` server will take at least 1000 seconds
// to complete. The request should fail with a timeout.
it('returns an error after the request times out', async () => {
const error = await fetchAzureKMSToken(new KMSRequestOptions('slow')).catch(e => e);
const error = await fetchAzureKMSToken(new KMSRequestOptions('slow'), closeSignal).catch(
e => e
);

expect(error).to.be.instanceof(MongoCryptAzureKMSRequestError);
});
Expand Down
6 changes: 4 additions & 2 deletions test/integration/client-side-encryption/driver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -829,12 +829,14 @@ describe('CSOT', function () {
});

describe('State machine', function () {
const stateMachine = new StateMachine({} as any);
const signal = new AbortController().signal;
const stateMachine = new StateMachine({} as any, undefined, signal);

const timeoutContext = () => ({
timeoutContext: new CSOTTimeoutContext({
timeoutMS: 1000,
serverSelectionTimeoutMS: 30000
serverSelectionTimeoutMS: 30000,
closeSignal: signal
})
});

Expand Down
1 change: 0 additions & 1 deletion test/mocha_mongodb.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
"recursive": true,
"timeout": 60000,
"failZero": true,
"reporter": "test/tools/reporter/mongodb_reporter.js",
"sort": true,
"color": true,
"ignore": [
Expand Down
4 changes: 2 additions & 2 deletions test/tools/cluster_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ SHARDED_DIR=${SHARDED_DIR:-$DATA_DIR/sharded_cluster}

if [[ $1 == "replica_set" ]]; then
mkdir -p $REPLICASET_DIR # user / password
mlaunch init --dir $REPLICASET_DIR --ipv6 --auth --username "bob" --password "pwd123" --replicaset --nodes 3 --arbiter --name rs --port 31000 --enableMajorityReadConcern --setParameter enableTestCommands=1
echo "mongodb://bob:pwd123@localhost:31000,localhost:31001,localhost:31002/?replicaSet=rs"
mlaunch init --dir $REPLICASET_DIR --ipv6 --auth --username "bob" --password "pwd123" --replicaset --nodes 3 --arbiter --name "repl0" --port 27017 --enableMajorityReadConcern --setParameter enableTestCommands=1
echo "mongodb://bob:pwd123@localhost:27017,localhost:27018,localhost:27019/?replicaSet=repl0"
elif [[ $1 == "sharded_cluster" ]]; then
mkdir -p $SHARDED_DIR
mlaunch init --dir $SHARDED_DIR --ipv6 --auth --username "bob" --password "pwd123" --replicaset --nodes 3 --name rs --port 51000 --enableMajorityReadConcern --setParameter enableTestCommands=1 --sharded 1 --mongos 2
Expand Down
18 changes: 8 additions & 10 deletions test/tools/cmap_spec_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,14 @@ const compareInputToSpec = (input, expected, message) => {
expect(input, message).to.equal(expected);
};

const closeSignal = new AbortController().signal;

const getTestOpDefinitions = (threadContext: ThreadContext) => ({
checkOut: async function (op) {
const timeoutContext = TimeoutContext.create({
serverSelectionTimeoutMS: 0,
waitQueueTimeoutMS: threadContext.pool.options.waitQueueTimeoutMS
waitQueueTimeoutMS: threadContext.pool.options.waitQueueTimeoutMS,
closeSignal
});
const connection: Connection = await ConnectionPool.prototype.checkOut.call(
threadContext.pool,
Expand Down Expand Up @@ -470,8 +473,6 @@ export function runCmapTestSuite(
client: MongoClient;

beforeEach(async function () {
let utilClient: MongoClient;

const skipDescription = options?.testsToSkip?.find(
({ description }) => description === test.description
);
Expand All @@ -486,20 +487,17 @@ export function runCmapTestSuite(
}
}

if (this.configuration.isLoadBalanced) {
// The util client can always point at the single mongos LB frontend.
utilClient = this.configuration.newClient(this.configuration.singleMongosLoadBalancerUri);
} else {
utilClient = this.configuration.newClient();
}
const utilClient = this.configuration.isLoadBalanced
? this.configuration.newClient(this.configuration.singleMongosLoadBalancerUri)
: this.configuration.newClient();

await utilClient.connect();

const allRequirements = test.runOn || [];

const someRequirementMet =
!allRequirements.length ||
(await isAnyRequirementSatisfied(this.currentTest.ctx, allRequirements, utilClient));
(await isAnyRequirementSatisfied(this.currentTest.ctx, allRequirements));

if (!someRequirementMet) {
await utilClient.close();
Expand Down
2 changes: 2 additions & 0 deletions test/tools/runner/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ export class TestConfiguration {
serverApi?: ServerApi;
activeResources: number;
isSrv: boolean;
shards: { host: string }[];

constructor(
private uri: string,
Expand All @@ -103,6 +104,7 @@ export class TestConfiguration {
this.topologyType = this.isLoadBalanced ? TopologyType.LoadBalanced : context.topologyType;
this.buildInfo = context.buildInfo;
this.serverApi = context.serverApi;
this.shards = context.shards;
this.isSrv = uri.indexOf('mongodb+srv') > -1;
this.options = {
hosts,
Expand Down
2 changes: 1 addition & 1 deletion test/tools/runner/ee_checker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ events.EventEmitter = class RequireErrorListenerEventEmitter extends EventEmitte
process.nextTick(() => {
const isCS = this.constructor.name.toLowerCase().includes('ChangeStream'.toLowerCase());
if (isCS) {
// consider adding a warning.
// consider adding a warning. something related to mode === 'iterator' should skip this.
return;
}
if (this.listenerCount('error') === 0) {
Expand Down
5 changes: 5 additions & 0 deletions test/tools/runner/hooks/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ const testConfigBeforeHook = async function () {
.command({ getParameter: '*' })
.catch(error => ({ noReply: error }));

context.shards =
context.topologyType === 'sharded'
? await client.db('config').collection('shards').find({}).toArray()
: [];

this.configuration = new TestConfiguration(
loadBalanced ? SINGLE_MONGOS_LB_URI : MONGODB_URI,
context
Expand Down
6 changes: 4 additions & 2 deletions test/tools/runner/hooks/leak_checker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ const leakCheckerAfterEach = async function () {
}
};

const TRACE_SOCKETS = process.env.TRACE_SOCKETS === 'true' ? true : false;
const kSocketId = Symbol('socketId');
const TRACE_SOCKETS = true; // process.env.TRACE_SOCKETS === 'true' ? true : false;
const kSocketId = '___socketId';
const kStack = '___stack';
const originalCreateConnection = net.createConnection;
let socketCounter = 0n;

Expand All @@ -150,6 +151,7 @@ const socketLeakCheckBeforeAll = function socketLeakCheckBeforeAll() {
net.createConnection = options => {
const socket = originalCreateConnection(options);
socket[kSocketId] = socketCounter.toString().padStart(5, '0');
socket[kStack] = new Error('').stack;
socketCounter++;
return socket;
};
Expand Down
Loading

0 comments on commit 6e10c5a

Please sign in to comment.