Skip to content

Commit

Permalink
fix(server/objects): solves database connection resource leak when cl…
Browse files Browse the repository at this point in the history
…ient connections are prematurely closed (#3889)
  • Loading branch information
iainsproat authored Jan 28, 2025
1 parent fc634eb commit 32c310f
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 12 deletions.
69 changes: 57 additions & 12 deletions packages/server/modules/core/rest/diffDownload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import zlib from 'zlib'
import { corsMiddleware } from '@/modules/core/configs/cors'
import type { Application } from 'express'
import { SpeckleObjectsStream } from '@/modules/core/rest/speckleObjectsStream'
import { pipeline, PassThrough } from 'stream'
import { Duplex, PassThrough, pipeline } from 'stream'
import { getObjectsStreamFactory } from '@/modules/core/repositories/objects'
import { db } from '@/db/knex'
import { validatePermissionsReadStreamFactory } from '@/modules/core/services/streams/auth'
Expand All @@ -11,6 +11,11 @@ import { authorizeResolver, validateScopes } from '@/modules/shared'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import { UserInputError } from '@/modules/core/errors/userinput'
import { ensureError } from '@speckle/shared'
import chain from 'stream-chain'
import { get } from 'lodash'
import { getFeatureFlags } from '@/modules/shared/helpers/envHelper'

const { FF_OBJECTS_STREAMING_FIX } = getFeatureFlags()

export default (app: Application) => {
const validatePermissionsReadStream = validatePermissionsReadStreamFactory({
Expand Down Expand Up @@ -56,14 +61,22 @@ export default (app: Application) => {
const speckleObjStream = new SpeckleObjectsStream(simpleText)
const gzipStream = zlib.createGzip()

pipeline(
speckleObjStream,
gzipStream,
new PassThrough({ highWaterMark: 16384 * 31 }),
res,
(err) => {
let chainPipeline: Duplex

if (FF_OBJECTS_STREAMING_FIX) {
// From node documentation: https://nodejs.org/docs/latest-v18.x/api/stream.html#stream_stream_pipeline_source_transforms_destination_callback
// > stream.pipeline() leaves dangling event listeners on the streams after the callback has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors.
// As workaround, we are using chain from 'stream-chain'
// Some more conversation around this: https://stackoverflow.com/questions/61072482/node-closing-streams-properly-after-pipeline
chainPipeline = chain([
speckleObjStream,
gzipStream,
new PassThrough({ highWaterMark: 16384 * 31 }),
res
])
chainPipeline.on('error', (err) => {
if (err) {
switch (err.code) {
switch (get(err, 'code')) {
case 'ERR_STREAM_PREMATURE_CLOSE':
req.log.info({ err }, 'Stream to client has prematurely closed')
break
Expand All @@ -79,10 +92,38 @@ export default (app: Application) => {
childCount: childrenList.length,
mbWritten: gzipStream.bytesWritten / 1000000
},
'Streamed {childCount} objects (size: {mbWritten} MB)'
'Encountered error. Prior to error, we streamed {childCount} objects (size: {mbWritten} MB)'
)
}
)
})
} else {
pipeline(
speckleObjStream,
gzipStream,
new PassThrough({ highWaterMark: 16384 * 31 }),
res,
(err) => {
if (err) {
switch (err.code) {
case 'ERR_STREAM_PREMATURE_CLOSE':
req.log.info({ err }, 'Stream to client has prematurely closed')
break
default:
req.log.error(err, 'App error streaming objects')
break
}
return
}

req.log.info(
{
childCount: childrenList.length,
mbWritten: gzipStream.bytesWritten / 1000000
},
'Streamed {childCount} objects (size: {mbWritten} MB)'
)
}
)
}

const cSize = 1000
try {
Expand All @@ -102,7 +143,11 @@ export default (app: Application) => {
})

await new Promise((resolve, reject) => {
dbStream.pipe(speckleObjStream, { end: false })
if (FF_OBJECTS_STREAMING_FIX) {
dbStream.pipe(chainPipeline, { end: false })
} else {
dbStream.pipe(speckleObjStream, { end: false })
}
dbStream.once('end', resolve)
dbStream.once('error', reject)
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/**
* Tests for regression of issue where closing client connections prematurely caused the database connection never to be closed (zombie)
*/
import { db } from '@/db/knex'
import {
createRandomEmail,
createRandomPassword
} from '@/modules/core/helpers/testHelpers'
import { getServerInfoFactory } from '@/modules/core/repositories/server'
import {
createUserEmailFactory,
ensureNoPrimaryEmailForUserFactory,
findEmailFactory
} from '@/modules/core/repositories/userEmails'
import {
countAdminUsersFactory,
legacyGetUserFactory,
storeUserAclFactory,
storeUserFactory
} from '@/modules/core/repositories/users'
import { validateAndCreateUserEmailFactory } from '@/modules/core/services/userEmails'
import { createUserFactory } from '@/modules/core/services/users/management'
import { deleteOldAndInsertNewVerificationFactory } from '@/modules/emails/repositories'
import { renderEmail } from '@/modules/emails/services/emailRendering'
import { sendEmail } from '@/modules/emails/services/sending'
import { requestNewEmailVerificationFactory } from '@/modules/emails/services/verification/request'
import {
deleteServerOnlyInvitesFactory,
updateAllInviteTargetsFactory
} from '@/modules/serverinvites/repositories/serverInvites'
import { finalizeInvitedServerRegistrationFactory } from '@/modules/serverinvites/services/processing'
import { beforeEachContext, initializeTestServer } from '@/test/hooks'
import { BasicTestStream, createTestStream } from '@/test/speckle-helpers/streamHelper'
import { createPersonalAccessTokenFactory } from '@/modules/core/services/tokens'
import {
storeApiTokenFactory,
storePersonalApiTokenFactory,
storeTokenResourceAccessDefinitionsFactory,
storeTokenScopesFactory
} from '@/modules/core/repositories/tokens'
import { Scopes } from '@speckle/shared'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { generateManyObjects } from '@/test/helpers'
import { RawSpeckleObject } from '@/modules/core/domain/objects/types'
import { createObjectsBatchedFactory } from '@/modules/core/services/objects/management'
import {
storeClosuresIfNotFoundFactory,
storeObjectsIfNotFoundFactory
} from '@/modules/core/repositories/objects'
import { expect } from 'chai'
import { getFeatureFlags } from '@/modules/shared/helpers/envHelper'

const getServerInfo = getServerInfoFactory({ db })
const getUser = legacyGetUserFactory({ db })
const requestNewEmailVerification = requestNewEmailVerificationFactory({
findEmail: findEmailFactory({ db }),
getUser,
getServerInfo,
deleteOldAndInsertNewVerification: deleteOldAndInsertNewVerificationFactory({ db }),
renderEmail,
sendEmail
})

const createUserEmail = validateAndCreateUserEmailFactory({
createUserEmail: createUserEmailFactory({ db }),
ensureNoPrimaryEmailForUser: ensureNoPrimaryEmailForUserFactory({ db }),
findEmail: findEmailFactory({ db }),
updateEmailInvites: finalizeInvitedServerRegistrationFactory({
deleteServerOnlyInvites: deleteServerOnlyInvitesFactory({ db }),
updateAllInviteTargets: updateAllInviteTargetsFactory({ db })
}),
requestNewEmailVerification
})

const findEmail = findEmailFactory({ db })
const createUser = createUserFactory({
getServerInfo,
findEmail,
storeUser: storeUserFactory({ db }),
countAdminUsers: countAdminUsersFactory({ db }),
storeUserAcl: storeUserAclFactory({ db }),
validateAndCreateUserEmail: createUserEmail,
emitEvent: getEventBus().emit
})

const createPersonalAccessToken = createPersonalAccessTokenFactory({
storeApiToken: storeApiTokenFactory({ db }),
storeTokenScopes: storeTokenScopesFactory({ db }),
storeTokenResourceAccessDefinitions: storeTokenResourceAccessDefinitionsFactory({
db
}),
storePersonalApiToken: storePersonalApiTokenFactory({ db })
})
const createObjectsBatched = createObjectsBatchedFactory({
storeObjectsIfNotFoundFactory: storeObjectsIfNotFoundFactory({ db }),
storeClosuresIfNotFound: storeClosuresIfNotFoundFactory({ db })
})

const { FF_OBJECTS_STREAMING_FIX } = getFeatureFlags()

describe('Objects REST @core', () => {
let serverAddress: string
before(async () => {
const ctx = await beforeEachContext()
;({ serverAddress } = await initializeTestServer(ctx))
})
;(FF_OBJECTS_STREAMING_FIX ? it : it.skip)(
'should close database connections if client connection is prematurely closed',
async () => {
const userId = await createUser({
name: 'emails user',
email: createRandomEmail(),
password: createRandomPassword()
})
const user = await getUser(userId)

const project = {
id: '',
name: 'test project',
ownerId: userId
}
await createTestStream(project as unknown as BasicTestStream, user)

const token = `Bearer ${await createPersonalAccessToken(
user.id,
'test token user A',
[
Scopes.Streams.Read,
Scopes.Streams.Write,
Scopes.Users.Read,
Scopes.Users.Email,
Scopes.Tokens.Write,
Scopes.Tokens.Read,
Scopes.Profile.Read,
Scopes.Profile.Email
]
)}`

const manyObjs: { commit: RawSpeckleObject; objs: RawSpeckleObject[] } =
generateManyObjects(3333, 'perlin merlin magic')
const objsIds = manyObjs.objs.map((o) => o.id)

await createObjectsBatched({ streamId: project.id, objects: manyObjs.objs })
for (let i = 0; i < 4; i++) {
forceCloseStreamingConnection({
serverAddress,
projectId: project.id,
token,
objsIds
})
}

//sleep for a bit to allow the server to close the connections
await new Promise((r) => setTimeout(r, 3000))
const gaugeContents = await determineRemainingDatabaseConnectionCapacity({
serverAddress
})
expect(parseInt(gaugeContents), gaugeContents).to.gte(4) //expect all connections to become available again after the client closes them
}
)
})

const forceCloseStreamingConnection = async (params: {
serverAddress: string
projectId: string
token: string
objsIds: (string | undefined)[]
}) => {
const { serverAddress, projectId, token, objsIds } = params
const controller = new AbortController()
const signal = controller.signal

const stream = await fetch(`${serverAddress}/api/getobjects/${projectId}`, {
signal,
method: 'POST',
headers: {
Authorization: token,
'Content-Type': 'application/json'
},
body: JSON.stringify({
objects: JSON.stringify(objsIds)
})
})

const partiallyGetBodyStreamThenCloseConnection = async () => {
const reader = stream.body?.getReader({ mode: 'byob' })
const buffer = new ArrayBuffer(1) //tiny buffer of 1 byte
await reader?.read(new Uint8Array(buffer, 0, buffer.byteLength)) // read first byte into our tiny buffer
controller.abort('force closing the connection') //immediately abort the connection
}
await partiallyGetBodyStreamThenCloseConnection()
}

const determineRemainingDatabaseConnectionCapacity = async (params: {
serverAddress: string
}): Promise<string> => {
const { serverAddress } = params
const metricsResponse = await fetch(`${serverAddress}/metrics`, {
method: 'GET'
})
const metricBody = await metricsResponse.text()
const match = [
...metricBody.matchAll(/(^speckle_server_knex_remaining_capacity.*)\}\s([\d]+)$/gm)
]
if (!match) {
expect(match).not.to.be.null
return '' //HACK force correct type below
}
const gaugeContents = match[0][2] //second capture group of the first & only match gives the gauge value
if (!gaugeContents) {
expect(gaugeContents).not.to.be.null
return '' //HACK force correct type below
}
return gaugeContents
}
1 change: 1 addition & 0 deletions packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
"response-time": "^2.3.2",
"sanitize-html": "^2.7.1",
"sharp": "^0.32.6",
"stream-chain": "^3.4.0",
"string-pixel-width": "^1.10.0",
"stripe": "^17.1.0",
"subscriptions-transport-ws": "^0.11.0",
Expand Down
6 changes: 6 additions & 0 deletions packages/shared/src/environment/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ const parseFeatureFlags = () => {
FF_FORCE_ONBOARDING: {
schema: z.boolean(),
defaults: { production: false, _: false }
},
// Fixes the streaming of objects by ensuring that the database stream is closed properly
FF_OBJECTS_STREAMING_FIX: {
schema: z.boolean(),
defaults: { production: false, _: false }
}
})

Expand Down Expand Up @@ -92,6 +97,7 @@ export function getFeatureFlags(): {
FF_FILEIMPORT_IFC_DOTNET_ENABLED: boolean
FF_FORCE_EMAIL_VERIFICATION: boolean
FF_FORCE_ONBOARDING: boolean
FF_OBJECTS_STREAMING_FIX: boolean
} {
if (!parsedFlags) parsedFlags = parseFeatureFlags()
return parsedFlags
Expand Down
3 changes: 3 additions & 0 deletions utils/helm/speckle-server/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,9 @@ Generate the environment variables for Speckle server and Speckle objects deploy
- name: FF_FORCE_ONBOARDING
value: {{ .Values.featureFlags.forceOnboarding | quote }}

- name: FF_OBJECTS_STREAMING_FIX
value: {{ .Values.featureFlags.objectsStreamingFix | quote }}

{{- if .Values.featureFlags.billingIntegrationEnabled }}
- name: STRIPE_API_KEY
valueFrom:
Expand Down
5 changes: 5 additions & 0 deletions utils/helm/speckle-server/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@
"type": "boolean",
"description": "Forces onboarding for all users",
"default": false
},
"objectsStreamingFix": {
"type": "boolean",
"description": "Enables the fix for the objects streaming issue when client prematurely closes the connection",
"default": false
}
}
},
Expand Down
2 changes: 2 additions & 0 deletions utils/helm/speckle-server/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ featureFlags:
forceEmailVerification: false
## @param featureFlags.forceOnboarding Forces onboarding for all users
forceOnboarding: false
## @param featureFlags.objectsStreamingFix Enables the fix for the objects streaming issue when client prematurely closes the connection
objectsStreamingFix: false

analytics:
## @param analytics.enabled Enable or disable analytics
Expand Down
Loading

0 comments on commit 32c310f

Please sign in to comment.