Skip to content

Commit

Permalink
Merge pull request #115 from FlowFuse/fix-creation-race-114
Browse files Browse the repository at this point in the history
Run promises in order
  • Loading branch information
knolleary authored Oct 18, 2023
2 parents 951cdc3 + 53932b6 commit f418b0d
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 74 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ driver:
projectNamespace: flowforge
cloudProvider: aws
privateCA: ff-ca-certs
k8sDelay: 1000
k8sRetries: 10
```
- `registry` is the Docker Registry to load Stack Containers from
Expand All @@ -26,6 +28,8 @@ should run on
- `cloudProvider` can be left unset for none `aws` deployments. This triggers the adding of
AWS EKS specific annotation for ALB Ingress.
- `privateCA` name of ConfigMap holding PEM CA Cert Bundle (file name `certs.pem`) Optional
- `k8sRetries` how many times to retry actions against the K8s API
- `k8sDelay` how long to wait (in ms) between retries to the K8s API

Expects to pick up K8s credentials from the environment

Expand Down
249 changes: 177 additions & 72 deletions kubernetes.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const got = require('got')
const k8s = require('@kubernetes/client-node')
const _ = require('lodash')

/**
* Kubernates Container driver
Expand Down Expand Up @@ -378,63 +379,119 @@ const createProject = async (project, options) => {
const localService = await createService(project, options)
const localIngress = await createIngress(project, options)

const promises = []
promises.push(this._k8sAppApi.createNamespacedDeployment(namespace, localDeployment).catch(err => {
this._app.log.error(`[k8s] Project ${project.id} - error creating deployment: ${err.toString()}`)
this._app.log.error(`[k8s] deployment ${JSON.stringify(localDeployment, undefined, 2)}`)
this._app.log.error(err)
// rethrow the error so the wrapper knows this hasn't worked
throw err
}))
/* eslint n/handle-callback-err: "off" */
promises.push(this._k8sApi.createNamespacedService(namespace, localService).catch(err => {
// TODO: This will fail if the service already exists. Which it okay if
// we're restarting a suspended project. As we don't know if we're restarting
// or not, we don't know if this is fatal or not.

// Once we can know if this is a restart or create, then we can decide
// whether to throw this error or not. For now, this will silently
// let it pass
//
if (project.state !== 'suspended') {
this._app.log.error(`[k8s] Project ${project.id} - error creating service: ${err.toString()}`)
try {
await this._k8sAppApi.createNamespacedDeployment(namespace, localDeployment)
} catch (err) {
if (err.statusCode === 409) {
// If deployment exists, perform an upgrade
this._app.log.warn(`[k8s] Deployment for project ${project.id} already exists. Upgrading deployment`)
const result = await this._k8sAppApi.readNamespacedDeployment(project.safeName, namespace)

const existingDeployment = result.body
// Check if the metadata and spec are aligned. They won't be though (at minimal because we regenerate auth)
if (!_.isEqual(existingDeployment.metadata, localDeployment.metadata) || !_.isEqual(existingDeployment.spec, localDeployment.spec)) {
// If not aligned, replace the deployment
await this._k8sAppApi.replaceNamespacedDeployment(project.safeName, namespace, localDeployment)
}
} else {
// Log other errors and rethrow them for additional higher-level handling
this._app.log.error(`[k8s] Unexpected error creating deployment for project ${project.id}.`)
this._app.log.error(`[k8s] deployment ${JSON.stringify(localDeployment, undefined, 2)}`)
this._app.log.error(err)
// rethrow the error so the wrapper knows this hasn't worked
throw err
}
// throw err
}))

// if (project.changedName) {
// promises.push(this._k8sNetApi.replaceNamespacedIngress(project.safeName,namespace, localIngress)).catch(err => {
// this._app.log.error(`[k8s] Project ${project.id} - error updating ingress: ${err.toString()}`)
// }).then (async () => {
// this._app.log.info(`[k8s] Ingress for project ${project.id} updated`)
// })
// } else {
promises.push(this._k8sNetApi.createNamespacedIngress(namespace, localIngress).catch(err => {
// TODO: This will fail if the service already exists. Which it okay if
// we're restarting a suspended project. As we don't know if we're restarting
// or not, we don't know if this is fatal or not.

// Once we can know if this is a restart or create, then we can decide
// whether to throw this error or not. For now, this will silently
// let it pass
//
if (project.state !== 'suspended') {
this._app.log.error(`[k8s] Project ${project.id} - error creating ingress: ${err.toString()}`)
}

await new Promise((resolve, reject) => {
let counter = 0
const pollInterval = setInterval(async () => {
try {
await this._k8sAppApi.readNamespacedDeployment(project.safeName, this._namespace)
clearInterval(pollInterval)
resolve()
} catch (err) {
// hmm
counter++
if (counter > this._k8sRetries) {
clearInterval(pollInterval)
this._app.log.error(`[k8s] Project ${project.id} - timeout waiting for Deployment`)
reject(new Error('Timed out to creating Deployment'))
}
}
}, this._k8sDelay)
})

try {
await this._k8sApi.createNamespacedService(namespace, localService)
} catch (err) {
if (err.statusCode === 409) {
this._app.log.warn(`[k8s] Service for project ${project.id} already exists, proceeding...`)
} else {
if (project.state !== 'suspended') {
this._app.log.error(`[k8s] Project ${project.id} - error creating service: ${err.toString()}`)
throw err
}
}
// throw err
}).then(async () => {
this._app.log.info(`[k8s] Ingress creation completed for project ${project.id}`)
}))
// }
}

await project.updateSetting('k8sType', 'deployment')
const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
await new Promise((resolve, reject) => {
let counter = 0
const pollInterval = setInterval(async () => {
try {
await this._k8sApi.readNamespacedService(prefix + project.safeName, this._namespace)
clearInterval(pollInterval)
resolve()
} catch (err) {
counter++
if (counter > this._k8sRetries) {
clearInterval(pollInterval)
this._app.log.error(`[k8s] Project ${project.id} - timeout waiting for Service`)
reject(new Error('Timed out to creating Service'))
}
}
}, this._k8sDelay)
})

try {
await this._k8sNetApi.createNamespacedIngress(namespace, localIngress)
} catch (err) {
if (err.statusCode === 409) {
this._app.log.warn(`[k8s] Ingress for project ${project.id} already exists, proceeding...`)
} else {
if (project.state !== 'suspended') {
this._app.log.error(`[k8s] Project ${project.id} - error creating ingress: ${err.toString()}`)
throw err
}
}
}

return Promise.all(promises).then(async () => {
this._app.log.debug(`[k8s] Container ${project.id} started`)
project.state = 'running'
await project.save()
this._projects[project.id].state = 'starting'
await new Promise((resolve, reject) => {
let counter = 0
const pollInterval = setInterval(async () => {
try {
await this._k8sNetApi.readNamespacedIngress(project.safeName, this._namespace)
clearInterval(pollInterval)
resolve()
} catch (err) {
counter++
if (counter > this._k8sRetries) {
clearInterval(pollInterval)
this._app.log.error(`[k8s] Project ${project.id} - timeout waiting for Ingress`)
reject(new Error('Timed out to creating Ingress'))
}
}
}, this._k8sDelay)
})

await project.updateSetting('k8sType', 'deployment')

this._app.log.debug(`[k8s] Container ${project.id} started`)
project.state = 'running'
await project.save()

this._projects[project.id].state = 'starting'
}

// eslint-disable-next-line no-unused-vars
Expand Down Expand Up @@ -534,6 +591,8 @@ module.exports = {
this._options = options

this._namespace = this._app.config.driver.options.projectNamespace || 'flowforge'
this._k8sDelay = this._app.config.driver.options.k8sDelay || 1000
this._k8sRetries = this._app.config.driver.options.k8sRetries || 10

const kc = new k8s.KubeConfig()

Expand Down Expand Up @@ -569,7 +628,7 @@ module.exports = {

this._initialCheckTimeout = setTimeout(() => {
this._app.log.debug('[k8s] Restarting projects')
const namespace = options.projectNamespace || 'flowforge'
const namespace = this._namespace
projects.forEach(async (project) => {
try {
if (project.state === 'suspended') {
Expand Down Expand Up @@ -608,7 +667,8 @@ module.exports = {
await this._k8sAppApi.readNamespacedDeployment(project.safeName, namespace)
this._app.log.info(`[k8s] deployment ${project.id} in ${namespace} found`)
} catch (err) {
this._app.log.debug(`[k8s] Project ${project.id} - recreating deployment`)
this._app.log.error(`[k8s] Error while reading namespaced deployment for project '${project.safeName}' ${project.id}. Error msg=${err.message}, stack=${err.stack}`)
this._app.log.info(`[k8s] Project ${project.id} - recreating deployment`)
const fullProject = await this._app.db.models.Project.byId(project.id)
await createProject(fullProject, options)
}
Expand Down Expand Up @@ -684,29 +744,69 @@ module.exports = {
* @param {Project} project - the project model instance
*/
stop: async (project) => {
// Stop the project, but don't remove all of its resources.
// Stop the project
this._projects[project.id].state = 'stopping'

try {
await this._k8sNetApi.deleteNamespacedIngress(project.safeName, this._namespace)
} catch (err) {
this._app.log.error(`[k8s] Project ${project.id} - error deleting ingress: ${err.toString()}`)
}
if (project.safeName.match(/^[0-9]/)) {
try {
await this._k8sApi.deleteNamespacedService('srv-' + project.safeName, this._namespace)
} catch (err) {
this._app.log.error(`[k8s] Project ${project.id} - error deleting service: ${err.toString()}`)
}
} else {
try {
await this._k8sApi.deleteNamespacedService(project.safeName, this._namespace)
} catch (err) {
this._app.log.error(`[k8s] Project ${project.id} - error deleting service: ${err.toString()}`)
}

// Note that, regardless, the main objective is to delete deployment (runnable)
// Even if some k8s resources like ingress or service are still not deleted (maybe because of
// k8s service latency), the most important thing is to get to deployment.
try {
await new Promise((resolve, reject) => {
let counter = 0
const pollInterval = setInterval(async () => {
try {
await this._k8sNetApi.readNamespacedIngress(project.safeName, this._namespace)
} catch (err) {
clearInterval(pollInterval)
resolve()
}
counter++
if (counter > this._k8sRetries) {
clearInterval(pollInterval)
this._app.log.error(`[k8s] Project ${project.id} - timed out deleting ingress`)
reject(new Error('Timed out to deleting Ingress'))
}
}, this._k8sDelay)
})
} catch (err) {
this._app.log.error(`[k8s] Project ${project.id} - Ingress was not deleted: ${err.toString()}`)
}

const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
try {
await this._k8sApi.deleteNamespacedService(prefix + project.safeName, this._namespace)
} catch (err) {
this._app.log.error(`[k8s] Project ${project.id} - error deleting service: ${err.toString()}`)
}

try {
await new Promise((resolve, reject) => {
let counter = 0
const pollInterval = setInterval(async () => {
try {
await this._k8sApi.readNamespacedService(prefix + project.safeName, this._namespace)
} catch (err) {
clearInterval(pollInterval)
resolve()
}
counter++
if (counter > this._k8sRetries) {
clearInterval(pollInterval)
this._app.log.error(`[k8s] Project ${project.id} - timed deleting service`)
reject(new Error('Timed out to deleting Service'))
}
}, this._k8sDelay)
})
} catch (err) {
this._app.log.error(`[k8s] Project ${project.id} - Service was not deleted: ${err.toString()}`)
}

// For now, we just want to remove the Pod/Deployment
const currentType = await project.getSetting('k8sType')
let pod = true
if (currentType === 'deployment') {
Expand All @@ -717,19 +817,26 @@ module.exports = {
}

this._projects[project.id].state = 'suspended'
return new Promise(resolve => {
return new Promise((resolve, reject) => {
let counter = 0
const pollInterval = setInterval(async () => {
try {
if (pod) {
await this._k8sApi.readNamespacedPodStatus(project.safeName, this._namespace)
} else {
await this._k8sAppApi.readNamespacedDeployment(project.safeName, this._namespace)
}
counter++
if (counter > this._k8sRetries) {
clearInterval(pollInterval)
this._app.log.error(`[k8s] Project ${project.id} - timed deleting ${pod ? 'Pod' : 'Deployment'}`)
reject(new Error('Timed out to deleting Deployment'))
}
} catch (err) {
clearInterval(pollInterval)
resolve()
}
}, 1000)
}, this._k8sDelay)
})
},

Expand All @@ -739,8 +846,6 @@ module.exports = {
* @return {Object}
*/
remove: async (project) => {
// let project = await this._app.db.models.Project.byId(id)

try {
await this._k8sNetApi.deleteNamespacedIngress(project.safeName, this._namespace)
} catch (err) {
Expand Down
13 changes: 12 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
"license": "Apache-2.0",
"dependencies": {
"@kubernetes/client-node": "^0.18.1",
"got": "^11.8.0"
"got": "^11.8.0",
"lodash": "^4.17.21"
},
"devDependencies": {
"eslint": "^8.25.0",
Expand Down

0 comments on commit f418b0d

Please sign in to comment.