Skip to content

Commit

Permalink
put safety check to standalone step
Browse files Browse the repository at this point in the history
  • Loading branch information
tczhao committed Dec 11, 2024
1 parent f2dd2e8 commit c6ef339
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 246 deletions.
261 changes: 15 additions & 246 deletions bin/local-install.js
Original file line number Diff line number Diff line change
@@ -1,204 +1,7 @@
const fs = require("fs");
const path = require("path");
const execSync = require("child_process").execSync;

const k8s = require("@kubernetes/client-node");
const { exit } = require("process");

// Kube config
const kc = new k8s.KubeConfig();
kc.loadFromDefault();

function skipRunningPackagesCheck(packageName, bypassSafetyCheck) {
/**
* Check if the last safe release was more than 24 hours ago. If not prevent safety check install.
*/
if (bypassSafetyCheck) {
return true;
}

const safetyCheckFile = `/tmp/atlan-update/${packageName.replace("/", "-")}-last-safe-run.txt`;
if (!fs.existsSync(safetyCheckFile)) {
return true;
}

const lastSafeRelease = parseInt(fs.readFileSync(safetyCheckFile, "utf-8"), 10);
const lastSafeReleaseDate = new Date(lastSafeRelease);
const now = new Date();
const diff = now - lastSafeReleaseDate;
const diffInHours = diff / (1000 * 60 * 60);
if (diffInHours < 24) {
return false;
}
return true;
}

function getAllPackagesMap() {
/**
* Returns a map of all packages in the packages directory
*/
const packagesMap = {};
console.log("Reading packages from " + marketplacePackagesPath);

const packages = fs
.readdirSync(marketplacePackagesPath, { recursive: true, withFileTypes: false })
.filter((file) => fs.lstatSync(path.join(marketplacePackagesPath, file)).isDirectory());

for (const packageName of packages) {
// Skip if packageName contains node_modules
if (packageName.includes("node_modules")) {
continue;
}

const packagePath = path.join(marketplacePackagesPath, packageName);
console.log("Reading package " + packageName + " from " + packagePath);

const packageJSONFileExists = fs.existsSync(path.join(packagePath, "package.json"));

if (!packageJSONFileExists) {
continue;
}

const packageJSON = JSON.parse(fs.readFileSync(path.join(packagePath, "package.json"), "utf-8"));

const isNumaflowPackage = fs.existsSync(path.join(packagePath, "pipelines"));

packagesMap[packageJSON.name] = {
name: packageJSON.name,
version: packageJSON.version,
dependencies: packageJSON.dependencies,
path: packagePath,
isNumaflowPackage: isNumaflowPackage,
};
}
console.log("Found " + Object.keys(packagesMap).length + " packages");
return packagesMap;
}

async function getAllRunningPackages() {
/**
* Returns a list of all packages that are currently running
*/

// Fetch all running workflows
const workflowClient = kc.makeApiClient(k8s.CustomObjectsApi);
const workflows = await workflowClient.listNamespacedCustomObject(
"argoproj.io",
"v1alpha1",
"default",
"workflows",
undefined,
undefined,
undefined,
undefined,
"workflows.argoproj.io/phase=Running"
);

// For every running workflow, check which package it belongs to
const runningPackages = [];
for (const workflow of workflows.body.items) {
const package = workflow.metadata.annotations["package.argoproj.io/name"];
if (package) {
runningPackages.push(package);
}
}
return runningPackages;
}

async function getInstalledPackages() {
/**
* Returns a list of all packages that are currently installed on the cluster
*/
const clusterWorkflowTemplateClient = kc.makeApiClient(k8s.CustomObjectsApi);
const clusterWorkflowTemplates = await clusterWorkflowTemplateClient.listClusterCustomObject(
"argoproj.io",
"v1alpha1",
"clusterworkflowtemplates"
);
const installedPackages = {};
for (const clusterWorkflowTemplate of clusterWorkflowTemplates.body.items) {
if (!clusterWorkflowTemplate.metadata.annotations || !clusterWorkflowTemplate.metadata.labels) {
continue;
}
const package = clusterWorkflowTemplate.metadata.annotations["package.argoproj.io/name"];
const packageVersion = clusterWorkflowTemplate.metadata.labels["package.argoproj.io/version"];
if (package && packageVersion) {
installedPackages[package] = packageVersion;
}
}
console.log("Installed packages: " + Object.keys(installedPackages).join(", "));
return installedPackages;
}

function getPackagesToInstall(packageName, packagesMap, installedPackages, skipVersionCheck, snapshotInstall) {
/**
* Returns a list of all packages that need to be installed
*/
var packagesToInstall = new Set();
const package = packagesMap[packageName];
if (!package) {
throw new Error(`Package ${packageName} not found`);
}

const snapshotInstallSuffix = "-snapshot";

for (const dependency of Object.keys(package.dependencies)) {
let dependencyPackage = packagesMap[dependency];
if (!dependencyPackage) {
throw new Error(`Dependency ${dependency} not found`);
}

if (snapshotInstall) {
if (!dependencyPackage.version.endsWith(snapshotInstallSuffix)) {
dependencyPackage.version = dependencyPackage.version + snapshotInstallSuffix;
}
packagesToInstall.add(dependencyPackage);
}

if (!installedPackages[dependencyPackage.name] || dependencyPackage.isNumaflowPackage) {
packagesToInstall.add(dependencyPackage);
}

if (skipVersionCheck || installedPackages[dependencyPackage.name] !== dependencyPackage.version) {
packagesToInstall.add(dependencyPackage);
}

if (dependencyPackage.dependencies) {
const dependencyPackagesToInstall = getPackagesToInstall(
dependency,
packagesMap,
installedPackages,
skipVersionCheck,
snapshotInstall
);
packagesToInstall = new Set([...packagesToInstall, ...dependencyPackagesToInstall]);
}
}
return packagesToInstall;
}

function getConnectorPackages() {
//All the connector packages don't have dependency to @atlan/connectors
//If changes for canary are present in canary deployment, and the crawler is running, then we have to stop installation of @atlan/connectors package
//Hence if any of these are running, we have to skip the installation of @atlan/connectors package.

//Read all the packages
//Check for isVerified, isCertified
//Check for type miner, utility and return for custom, connectors etc.
const packages = fs
.readdirSync(marketplacePackagesPath, { recursive: true, withFileTypes: false })
.filter((file) => file.endsWith("package.json"))
.map((file) => JSON.parse(fs.readFileSync(path.join(marketplacePackagesPath, file), "utf-8")))
.filter((pkg) => pkg.config?.labels?.["orchestration.atlan.com/certified"] === "true")
.filter(
(pkg) =>
pkg.config?.labels?.["orchestration.atlan.com/type"] !== "miner" &&
pkg.config?.labels?.["orchestration.atlan.com/type"] !== "utility"
)
.map((pkg) => pkg.name);

return packages;
}
const { getPackagesToInstall, getInstalledPackages, getAllPackagesMap } = require("../lib/local-install-util");

function installPackages(packages, extraArgs, azureArtifacts) {
// Install packages
Expand All @@ -224,16 +27,16 @@ function installPackages(packages, extraArgs, azureArtifacts) {
}

async function run(
marketplacePackagesPath,
packageName,
azureArtifacts,
bypassSafetyCheck,
extraArgs,
channel,
snapshotInstall,
skipVersionCheck,
skipPackages
) {
const packagesMap = getAllPackagesMap();
const packagesMap = getAllPackagesMap(marketplacePackagesPath);
const installedPackages = await getInstalledPackages();

const initPackagesToInstall = getPackagesToInstall(
Expand Down Expand Up @@ -261,78 +64,44 @@ async function run(

// Always install numaflow packages since delete-pipelines may have deleted them
const numaflowPackages = [...packagesToInstall].filter((pkg) => pkg.isNumaflowPackage);
const connectorPackages = [...packagesToInstall].find((pkg) => "@atlan/connectors" === pkg.name)
? getConnectorPackages()
: [];
if (packageName != "@atlan/cloud-packages") {
console.log("Numaflow packages to install: " + numaflowPackages.map((pkg) => pkg.name).join(", "));
installPackages(numaflowPackages, extraArgs, azureArtifacts);
}

var safeToInstall = true;
if (!skipRunningPackagesCheck(packageName, bypassSafetyCheck)) {
// Check if running workflows have packages that need to be installed
const runningPackages = await getAllRunningPackages();
console.log("Running packages: " + runningPackages.join(", "));
const packagesToInstallNames = Array.from(packagesToInstall).map((pkg) => pkg.name);
for (const runningPackage of runningPackages) {
if (packagesToInstallNames.includes(runningPackage)) {
safeToInstall = false;
break;
}
if (connectorPackages.includes(runningPackage)) {
//If any of the connector packages are running, then we have to skip the installation of @atlan/connectors package.
safeToInstall = false;
console.log(
`Connector package ${runningPackage} is running. Skipping installation of @atlan/connectors package`
);
break;
}
}
}
console.log("Safe to install: " + safeToInstall);

if (!safeToInstall) {
console.warn("Not safe to install. Waiting for running workflows to complete before installing packages.");
// use custom exit code 100 to bypass workflow failure
// choose code 100 to avoid collision https://node.readthedocs.io/en/latest/api/process/
exit(100);
}

// Install packages
const argoPackages = [...packagesToInstall].filter((pkg) => !pkg.isNumaflowPackage);
console.log("Argo packages to install: " + argoPackages.map((pkg) => pkg.name).join(", "));

installPackages(argoPackages, extraArgs, azureArtifacts, snapshotInstall);

// Write last safe release
fs.writeFileSync(
`/tmp/atlan-update/${packageName.replace("/", "-")}-last-safe-run.txt`,
`${Math.floor(new Date().getTime())}`
);
const filePath = `/tmp/atlan-update/${packageName.replace("/", "-")}-last-safe-run.txt`;
const dirPath = path.dirname(filePath);
fs.mkdirSync(dirPath, { recursive: true });
fs.writeFileSync(filePath, `${Math.floor(new Date().getTime())}`);
}

// Take package name as input
const marketplacePackagesPath = process.argv[2];
const packageName = process.argv[3];
const azureArtifacts = process.argv[4];
const bypassSafetyCheckString = process.argv[5];
const extraArgs = process.argv[6];
const channel = process.argv[7];
const extraArgs = process.argv[5];
const channel = process.argv[6];
// snapshotInstall install package regardless of package version
// It respects bypassSafetyCheck, and added a -snapshot suffix to the version
const snapshotInstallString = process.argv[8];
const skipVersionCheckString = process.argv[9];
const skipPackages = process.argv[10];
// It adds a -snapshot suffix to the version
const snapshotInstallString = process.argv[7];
const skipVersionCheckString = process.argv[8];
const skipPackagesString = process.argv[9];

const bypassSafetyCheck = bypassSafetyCheckString === "true";
const snapshotInstall = snapshotInstallString === "true";
const skipVersionCheck = skipVersionCheckString === "true";
const skipPackages = skipPackagesString.startsWith("[") ? skipPackagesString : "[]";

run(
marketplacePackagesPath,
packageName,
azureArtifacts,
bypassSafetyCheck,
extraArgs,
channel,
snapshotInstall,
Expand Down
Loading

0 comments on commit c6ef339

Please sign in to comment.