From c6ef339a209814ede4aa00290479a406301f1f2d Mon Sep 17 00:00:00 2001 From: Tianchu Zhao Date: Wed, 11 Dec 2024 19:49:38 +0530 Subject: [PATCH] put safety check to standalone step --- bin/local-install.js | 261 +++----------------------------------- bin/safety-check.js | 88 +++++++++++++ lib/local-install-util.js | 180 ++++++++++++++++++++++++++ 3 files changed, 283 insertions(+), 246 deletions(-) create mode 100644 bin/safety-check.js create mode 100644 lib/local-install-util.js diff --git a/bin/local-install.js b/bin/local-install.js index 496c321..d2524bf 100644 --- a/bin/local-install.js +++ b/bin/local-install.js @@ -1,204 +1,7 @@ const fs = require("fs"); const path = require("path"); const execSync = require("child_process").execSync; - -const k8s = require("@kubernetes/client-node"); -const { exit } = require("process"); - -// Kube config -const kc = new k8s.KubeConfig(); -kc.loadFromDefault(); - -function skipRunningPackagesCheck(packageName, bypassSafetyCheck) { - /** - * Check if the last safe release was more than 24 hours ago. If not prevent safety check install. - */ - if (bypassSafetyCheck) { - return true; - } - - const safetyCheckFile = `/tmp/atlan-update/${packageName.replace("/", "-")}-last-safe-run.txt`; - if (!fs.existsSync(safetyCheckFile)) { - return true; - } - - const lastSafeRelease = parseInt(fs.readFileSync(safetyCheckFile, "utf-8"), 10); - const lastSafeReleaseDate = new Date(lastSafeRelease); - const now = new Date(); - const diff = now - lastSafeReleaseDate; - const diffInHours = diff / (1000 * 60 * 60); - if (diffInHours < 24) { - return false; - } - return true; -} - -function getAllPackagesMap() { - /** - * Returns a map of all packages in the packages directory - */ - const packagesMap = {}; - console.log("Reading packages from " + marketplacePackagesPath); - - const packages = fs - .readdirSync(marketplacePackagesPath, { recursive: true, withFileTypes: false }) - .filter((file) => fs.lstatSync(path.join(marketplacePackagesPath, file)).isDirectory()); - - for (const packageName of packages) { - // Skip if packageName contains node_modules - if (packageName.includes("node_modules")) { - continue; - } - - const packagePath = path.join(marketplacePackagesPath, packageName); - console.log("Reading package " + packageName + " from " + packagePath); - - const packageJSONFileExists = fs.existsSync(path.join(packagePath, "package.json")); - - if (!packageJSONFileExists) { - continue; - } - - const packageJSON = JSON.parse(fs.readFileSync(path.join(packagePath, "package.json"), "utf-8")); - - const isNumaflowPackage = fs.existsSync(path.join(packagePath, "pipelines")); - - packagesMap[packageJSON.name] = { - name: packageJSON.name, - version: packageJSON.version, - dependencies: packageJSON.dependencies, - path: packagePath, - isNumaflowPackage: isNumaflowPackage, - }; - } - console.log("Found " + Object.keys(packagesMap).length + " packages"); - return packagesMap; -} - -async function getAllRunningPackages() { - /** - * Returns a list of all packages that are currently running - */ - - // Fetch all running workflows - const workflowClient = kc.makeApiClient(k8s.CustomObjectsApi); - const workflows = await workflowClient.listNamespacedCustomObject( - "argoproj.io", - "v1alpha1", - "default", - "workflows", - undefined, - undefined, - undefined, - undefined, - "workflows.argoproj.io/phase=Running" - ); - - // For every running workflow, check which package it belongs to - const runningPackages = []; - for (const workflow of workflows.body.items) { - const package = workflow.metadata.annotations["package.argoproj.io/name"]; - if (package) { - runningPackages.push(package); - } - } - return runningPackages; -} - -async function getInstalledPackages() { - /** - * Returns a list of all packages that are currently installed on the cluster - */ - const clusterWorkflowTemplateClient = kc.makeApiClient(k8s.CustomObjectsApi); - const clusterWorkflowTemplates = await clusterWorkflowTemplateClient.listClusterCustomObject( - "argoproj.io", - "v1alpha1", - "clusterworkflowtemplates" - ); - const installedPackages = {}; - for (const clusterWorkflowTemplate of clusterWorkflowTemplates.body.items) { - if (!clusterWorkflowTemplate.metadata.annotations || !clusterWorkflowTemplate.metadata.labels) { - continue; - } - const package = clusterWorkflowTemplate.metadata.annotations["package.argoproj.io/name"]; - const packageVersion = clusterWorkflowTemplate.metadata.labels["package.argoproj.io/version"]; - if (package && packageVersion) { - installedPackages[package] = packageVersion; - } - } - console.log("Installed packages: " + Object.keys(installedPackages).join(", ")); - return installedPackages; -} - -function getPackagesToInstall(packageName, packagesMap, installedPackages, skipVersionCheck, snapshotInstall) { - /** - * Returns a list of all packages that need to be installed - */ - var packagesToInstall = new Set(); - const package = packagesMap[packageName]; - if (!package) { - throw new Error(`Package ${packageName} not found`); - } - - const snapshotInstallSuffix = "-snapshot"; - - for (const dependency of Object.keys(package.dependencies)) { - let dependencyPackage = packagesMap[dependency]; - if (!dependencyPackage) { - throw new Error(`Dependency ${dependency} not found`); - } - - if (snapshotInstall) { - if (!dependencyPackage.version.endsWith(snapshotInstallSuffix)) { - dependencyPackage.version = dependencyPackage.version + snapshotInstallSuffix; - } - packagesToInstall.add(dependencyPackage); - } - - if (!installedPackages[dependencyPackage.name] || dependencyPackage.isNumaflowPackage) { - packagesToInstall.add(dependencyPackage); - } - - if (skipVersionCheck || installedPackages[dependencyPackage.name] !== dependencyPackage.version) { - packagesToInstall.add(dependencyPackage); - } - - if (dependencyPackage.dependencies) { - const dependencyPackagesToInstall = getPackagesToInstall( - dependency, - packagesMap, - installedPackages, - skipVersionCheck, - snapshotInstall - ); - packagesToInstall = new Set([...packagesToInstall, ...dependencyPackagesToInstall]); - } - } - return packagesToInstall; -} - -function getConnectorPackages() { - //All the connector packages don't have dependency to @atlan/connectors - //If changes for canary are present in canary deployment, and the crawler is running, then we have to stop installation of @atlan/connectors package - //Hence if any of these are running, we have to skip the installation of @atlan/connectors package. - - //Read all the packages - //Check for isVerified, isCertified - //Check for type miner, utility and return for custom, connectors etc. - const packages = fs - .readdirSync(marketplacePackagesPath, { recursive: true, withFileTypes: false }) - .filter((file) => file.endsWith("package.json")) - .map((file) => JSON.parse(fs.readFileSync(path.join(marketplacePackagesPath, file), "utf-8"))) - .filter((pkg) => pkg.config?.labels?.["orchestration.atlan.com/certified"] === "true") - .filter( - (pkg) => - pkg.config?.labels?.["orchestration.atlan.com/type"] !== "miner" && - pkg.config?.labels?.["orchestration.atlan.com/type"] !== "utility" - ) - .map((pkg) => pkg.name); - - return packages; -} +const { getPackagesToInstall, getInstalledPackages, getAllPackagesMap } = require("../lib/local-install-util"); function installPackages(packages, extraArgs, azureArtifacts) { // Install packages @@ -224,16 +27,16 @@ function installPackages(packages, extraArgs, azureArtifacts) { } async function run( + marketplacePackagesPath, packageName, azureArtifacts, - bypassSafetyCheck, extraArgs, channel, snapshotInstall, skipVersionCheck, skipPackages ) { - const packagesMap = getAllPackagesMap(); + const packagesMap = getAllPackagesMap(marketplacePackagesPath); const installedPackages = await getInstalledPackages(); const initPackagesToInstall = getPackagesToInstall( @@ -261,44 +64,11 @@ async function run( // Always install numaflow packages since delete-pipelines may have deleted them const numaflowPackages = [...packagesToInstall].filter((pkg) => pkg.isNumaflowPackage); - const connectorPackages = [...packagesToInstall].find((pkg) => "@atlan/connectors" === pkg.name) - ? getConnectorPackages() - : []; if (packageName != "@atlan/cloud-packages") { console.log("Numaflow packages to install: " + numaflowPackages.map((pkg) => pkg.name).join(", ")); installPackages(numaflowPackages, extraArgs, azureArtifacts); } - var safeToInstall = true; - if (!skipRunningPackagesCheck(packageName, bypassSafetyCheck)) { - // Check if running workflows have packages that need to be installed - const runningPackages = await getAllRunningPackages(); - console.log("Running packages: " + runningPackages.join(", ")); - const packagesToInstallNames = Array.from(packagesToInstall).map((pkg) => pkg.name); - for (const runningPackage of runningPackages) { - if (packagesToInstallNames.includes(runningPackage)) { - safeToInstall = false; - break; - } - if (connectorPackages.includes(runningPackage)) { - //If any of the connector packages are running, then we have to skip the installation of @atlan/connectors package. - safeToInstall = false; - console.log( - `Connector package ${runningPackage} is running. Skipping installation of @atlan/connectors package` - ); - break; - } - } - } - console.log("Safe to install: " + safeToInstall); - - if (!safeToInstall) { - console.warn("Not safe to install. Waiting for running workflows to complete before installing packages."); - // use custom exit code 100 to bypass workflow failure - // choose code 100 to avoid collision https://node.readthedocs.io/en/latest/api/process/ - exit(100); - } - // Install packages const argoPackages = [...packagesToInstall].filter((pkg) => !pkg.isNumaflowPackage); console.log("Argo packages to install: " + argoPackages.map((pkg) => pkg.name).join(", ")); @@ -306,33 +76,32 @@ async function run( installPackages(argoPackages, extraArgs, azureArtifacts, snapshotInstall); // Write last safe release - fs.writeFileSync( - `/tmp/atlan-update/${packageName.replace("/", "-")}-last-safe-run.txt`, - `${Math.floor(new Date().getTime())}` - ); + const filePath = `/tmp/atlan-update/${packageName.replace("/", "-")}-last-safe-run.txt`; + const dirPath = path.dirname(filePath); + fs.mkdirSync(dirPath, { recursive: true }); + fs.writeFileSync(filePath, `${Math.floor(new Date().getTime())}`); } // Take package name as input const marketplacePackagesPath = process.argv[2]; const packageName = process.argv[3]; const azureArtifacts = process.argv[4]; -const bypassSafetyCheckString = process.argv[5]; -const extraArgs = process.argv[6]; -const channel = process.argv[7]; +const extraArgs = process.argv[5]; +const channel = process.argv[6]; // snapshotInstall install package regardless of package version -// It respects bypassSafetyCheck, and added a -snapshot suffix to the version -const snapshotInstallString = process.argv[8]; -const skipVersionCheckString = process.argv[9]; -const skipPackages = process.argv[10]; +// It adds a -snapshot suffix to the version +const snapshotInstallString = process.argv[7]; +const skipVersionCheckString = process.argv[8]; +const skipPackagesString = process.argv[9]; -const bypassSafetyCheck = bypassSafetyCheckString === "true"; const snapshotInstall = snapshotInstallString === "true"; const skipVersionCheck = skipVersionCheckString === "true"; +const skipPackages = skipPackagesString.startsWith("[") ? skipPackagesString : "[]"; run( + marketplacePackagesPath, packageName, azureArtifacts, - bypassSafetyCheck, extraArgs, channel, snapshotInstall, diff --git a/bin/safety-check.js b/bin/safety-check.js new file mode 100644 index 0000000..9bc0f3d --- /dev/null +++ b/bin/safety-check.js @@ -0,0 +1,88 @@ +const { exit } = require("process"); +const fs = require("fs"); +const { + getAllRunningPackages, + getPackagesToInstall, + getInstalledPackages, + getAllPackagesMap, + getConnectorPackages, +} = require("../lib/local-install-util"); + +function skipRunningPackagesCheck(packageName) { + /** + * Check if the last safe release was more than 24 hours ago. If not prevent safety check install. + */ + + const safetyCheckFile = `/tmp/atlan-update/${packageName.replace("/", "-")}-last-safe-run.txt`; + if (!fs.existsSync(safetyCheckFile)) { + return true; + } + + const lastSafeRelease = parseInt(fs.readFileSync(safetyCheckFile, "utf-8"), 10); + const lastSafeReleaseDate = new Date(lastSafeRelease); + const now = new Date(); + const diff = now - lastSafeReleaseDate; + const diffInHours = diff / (1000 * 60 * 60); + if (diffInHours < 24) { + return false; + } + return true; +} + +async function run(marketplacePackagesPath, packageName, snapshotInstall, skipVersionCheck) { + if (snapshotInstall) { + console.log("snapshot install, safe to proceed"); + exit(0); + } + const packagesMap = getAllPackagesMap(marketplacePackagesPath); + const installedPackages = await getInstalledPackages(); + console.log(123); + const packagesToInstall = getPackagesToInstall( + packageName, + packagesMap, + installedPackages, + skipVersionCheck, + snapshotInstall + ); + var safeToInstall = true; + if (!skipRunningPackagesCheck(packageName)) { + // Check if running workflows have packages that need to be installed + const runningPackages = await getAllRunningPackages(); + console.log("Running packages: " + runningPackages.join(", ")); + const packagesToInstallNames = Array.from(packagesToInstall).map((pkg) => pkg.name); + const connectorPackages = [...packagesToInstall].find((pkg) => "@atlan/connectors" === pkg.name) + ? getConnectorPackages(marketplacePackagesPath) + : []; + for (const runningPackage of runningPackages) { + if (packagesToInstallNames.includes(runningPackage)) { + safeToInstall = false; + break; + } + if (connectorPackages.includes(runningPackage)) { + //If any of the connector packages are running, then we have to skip the installation of @atlan/connectors package. + safeToInstall = false; + console.log( + `Connector package ${runningPackage} is running. Skipping installation of @atlan/connectors package` + ); + break; + } + } + } + console.log("Safe to install: " + safeToInstall); + + if (!safeToInstall) { + console.warn("Not safe to install. Waiting for running workflows to complete before installing packages."); + // use custom exit code 100 to bypass workflow failure + // choose code 100 to avoid collision https://node.readthedocs.io/en/latest/api/process/ + exit(100); + } +} + +const marketplacePackagesPath = process.argv[2]; +const packageName = process.argv[3]; +const snapshotInstallString = process.argv[4]; +const skipVersionCheckString = process.argv[5]; +const snapshotInstall = snapshotInstallString === "true"; +const skipVersionCheck = skipVersionCheckString === "true"; + +run(marketplacePackagesPath, packageName, snapshotInstall, skipVersionCheck); diff --git a/lib/local-install-util.js b/lib/local-install-util.js new file mode 100644 index 0000000..a9d38d1 --- /dev/null +++ b/lib/local-install-util.js @@ -0,0 +1,180 @@ +const fs = require("fs"); +const path = require("path"); +const k8s = require("@kubernetes/client-node"); + +// Kube config +const kc = new k8s.KubeConfig(); +kc.loadFromDefault(); + +async function getAllRunningPackages() { + /** + * Returns a list of all packages that are currently running + */ + + // Fetch all running workflows + const workflowClient = kc.makeApiClient(k8s.CustomObjectsApi); + const workflows = await workflowClient.listNamespacedCustomObject( + "argoproj.io", + "v1alpha1", + "default", + "workflows", + undefined, + undefined, + undefined, + undefined, + "workflows.argoproj.io/phase=Running" + ); + + // For every running workflow, check which package it belongs to + const runningPackages = []; + for (const workflow of workflows.body.items) { + const package = workflow.metadata.annotations["package.argoproj.io/name"]; + if (package) { + runningPackages.push(package); + } + } + return runningPackages; +} + +async function getInstalledPackages() { + /** + * Returns a list of all packages that are currently installed on the cluster + */ + const clusterWorkflowTemplateClient = kc.makeApiClient(k8s.CustomObjectsApi); + const clusterWorkflowTemplates = await clusterWorkflowTemplateClient.listClusterCustomObject( + "argoproj.io", + "v1alpha1", + "clusterworkflowtemplates" + ); + const installedPackages = {}; + for (const clusterWorkflowTemplate of clusterWorkflowTemplates.body.items) { + if (!clusterWorkflowTemplate.metadata.annotations || !clusterWorkflowTemplate.metadata.labels) { + continue; + } + const package = clusterWorkflowTemplate.metadata.annotations["package.argoproj.io/name"]; + const packageVersion = clusterWorkflowTemplate.metadata.labels["package.argoproj.io/version"]; + if (package && packageVersion) { + installedPackages[package] = packageVersion; + } + } + console.log("Installed packages: " + Object.keys(installedPackages).join(", ")); + return installedPackages; +} + +function getPackagesToInstall(packageName, packagesMap, installedPackages, skipVersionCheck, snapshotInstall) { + /** + * Returns a list of all packages that need to be installed + */ + var packagesToInstall = new Set(); + const package = packagesMap[packageName]; + if (!package) { + throw new Error(`Package ${packageName} not found`); + } + + const snapshotInstallSuffix = "-snapshot"; + + for (const dependency of Object.keys(package.dependencies)) { + let dependencyPackage = packagesMap[dependency]; + if (!dependencyPackage) { + throw new Error(`Dependency ${dependency} not found`); + } + + if (snapshotInstall) { + if (!dependencyPackage.version.endsWith(snapshotInstallSuffix)) { + dependencyPackage.version = dependencyPackage.version + snapshotInstallSuffix; + } + packagesToInstall.add(dependencyPackage); + } + + if (!installedPackages[dependencyPackage.name] || dependencyPackage.isNumaflowPackage) { + packagesToInstall.add(dependencyPackage); + } + + if (skipVersionCheck || installedPackages[dependencyPackage.name] !== dependencyPackage.version) { + packagesToInstall.add(dependencyPackage); + } + + if (dependencyPackage.dependencies) { + const dependencyPackagesToInstall = getPackagesToInstall( + dependency, + packagesMap, + installedPackages, + skipVersionCheck, + snapshotInstall + ); + packagesToInstall = new Set([...packagesToInstall, ...dependencyPackagesToInstall]); + } + } + return packagesToInstall; +} + +function getAllPackagesMap(marketplacePackagesPath) { + /** + * Returns a map of all packages in the packages directory + */ + const packagesMap = {}; + console.log("Reading packages from " + marketplacePackagesPath); + + const packages = fs + .readdirSync(marketplacePackagesPath, { recursive: true, withFileTypes: false }) + .filter((file) => fs.lstatSync(path.join(marketplacePackagesPath, file)).isDirectory()); + + for (const packageName of packages) { + // Skip if packageName contains node_modules + if (packageName.includes("node_modules")) { + continue; + } + + const packagePath = path.join(marketplacePackagesPath, packageName); + console.log("Reading package " + packageName + " from " + packagePath); + + const packageJSONFileExists = fs.existsSync(path.join(packagePath, "package.json")); + + if (!packageJSONFileExists) { + continue; + } + + const packageJSON = JSON.parse(fs.readFileSync(path.join(packagePath, "package.json"), "utf-8")); + + const isNumaflowPackage = fs.existsSync(path.join(packagePath, "pipelines")); + + packagesMap[packageJSON.name] = { + name: packageJSON.name, + version: packageJSON.version, + dependencies: packageJSON.dependencies, + path: packagePath, + isNumaflowPackage: isNumaflowPackage, + }; + } + console.log("Found " + Object.keys(packagesMap).length + " packages"); + return packagesMap; +} + +function getConnectorPackages(marketplacePackagesPath) { + //All the connector packages don't have dependency to @atlan/connectors + //If changes for canary are present in canary deployment, and the crawler is running, then we have to stop installation of @atlan/connectors package + //Hence if any of these are running, we have to skip the installation of @atlan/connectors package. + + //Read all the packages + //Check for isVerified, isCertified + //Check for type miner, utility and return for custom, connectors etc. + const packages = fs + .readdirSync(marketplacePackagesPath, { recursive: true, withFileTypes: false }) + .filter((file) => file.endsWith("package.json")) + .map((file) => JSON.parse(fs.readFileSync(path.join(marketplacePackagesPath, file), "utf-8"))) + .filter((pkg) => pkg.config?.labels?.["orchestration.atlan.com/certified"] === "true") + .filter( + (pkg) => + pkg.config?.labels?.["orchestration.atlan.com/type"] !== "miner" && + pkg.config?.labels?.["orchestration.atlan.com/type"] !== "utility" + ) + .map((pkg) => pkg.name); + + return packages; +} + +exports.getAllRunningPackages = getAllRunningPackages; +exports.getPackagesToInstall = getPackagesToInstall; +exports.getInstalledPackages = getInstalledPackages; +exports.getAllPackagesMap = getAllPackagesMap; +exports.getConnectorPackages = getConnectorPackages;