From 3c25ab9dceb52663a432450fd1fb4dfa1a6e9580 Mon Sep 17 00:00:00 2001 From: Muhammad Awawdi Date: Wed, 20 Nov 2024 20:00:32 +0200 Subject: [PATCH] Node: add AZ Affinity ReadFrom strategy Support (#2686) * Added AZAffinity strategy to Node.js --------- Signed-off-by: Muhammad Awawdi --- node/src/BaseClient.ts | 14 + node/tests/GlideClient.test.ts | 24 +- node/tests/GlideClusterClient.test.ts | 460 +++++++++++++++++++++++++- 3 files changed, 482 insertions(+), 16 deletions(-) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 6ba8f975cf..7c2e3feb57 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -574,6 +574,18 @@ export interface BaseClientConfiguration { * used. */ inflightRequestsLimit?: number; + /** + * Availability Zone of the client. + * If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits. + * + * @example + * ```typescript + * // Example configuration for setting client availability zone and read strategy + * configuration.clientAz = 'us-east-1a'; // Sets the client's availability zone + * configuration.readFrom = 'AZAffinity'; // Directs read operations to nodes within the same AZ + * ``` + */ + clientAz?: string; } /** @@ -719,6 +731,7 @@ export class BaseClient { private readonly pubsubFutures: [PromiseFunction, ErrorFunction][] = []; private pendingPushNotification: response.Response[] = []; private readonly inflightRequestsLimit: number; + private readonly clientAz: string | undefined; private config: BaseClientConfiguration | undefined; protected configurePubsub( @@ -7578,6 +7591,7 @@ export class BaseClient { readFrom, authenticationInfo, inflightRequestsLimit: options.inflightRequestsLimit, + clientAz: options.clientAz ?? null, }; } diff --git a/node/tests/GlideClient.test.ts b/node/tests/GlideClient.test.ts index 49f056f2b0..0c77bde519 100644 --- a/node/tests/GlideClient.test.ts +++ b/node/tests/GlideClient.test.ts @@ -51,7 +51,9 @@ const TIMEOUT = 50000; describe("GlideClient", () => { let testsFailed = 0; let cluster: ValkeyCluster; + let azCluster: ValkeyCluster; let client: GlideClient; + let azClient: GlideClient; beforeAll(async () => { const standaloneAddresses = global.STAND_ALONE_ENDPOINT; cluster = standaloneAddresses @@ -61,17 +63,28 @@ describe("GlideClient", () => { getServerVersion, ) : await ValkeyCluster.createCluster(false, 1, 1, getServerVersion); + + azCluster = standaloneAddresses + ? await ValkeyCluster.initFromExistingCluster( + false, + parseEndpoints(standaloneAddresses), + getServerVersion, + ) + : await ValkeyCluster.createCluster(false, 1, 1, getServerVersion); }, 20000); afterEach(async () => { await flushAndCloseClient(false, cluster.getAddresses(), client); + await flushAndCloseClient(false, azCluster.getAddresses(), azClient); }); afterAll(async () => { if (testsFailed === 0) { await cluster.close(); + await azCluster.close(); } else { await cluster.close(true); + await azCluster.close(); } }, TIMEOUT); @@ -1500,7 +1513,6 @@ describe("GlideClient", () => { } }, ); - runBaseTests({ init: async (protocol, configOverrides) => { const config = getClientConfigurationOption( @@ -1508,10 +1520,18 @@ describe("GlideClient", () => { protocol, configOverrides, ); + client = await GlideClient.createClient(config); + + const configNew = getClientConfigurationOption( + azCluster.getAddresses(), + protocol, + configOverrides, + ); testsFailed += 1; + azClient = await GlideClient.createClient(configNew); client = await GlideClient.createClient(config); - return { client, cluster }; + return { client, cluster, azClient, azCluster }; }, close: (testSucceeded: boolean) => { if (testSucceeded) { diff --git a/node/tests/GlideClusterClient.test.ts b/node/tests/GlideClusterClient.test.ts index 0768b61088..79868c36a8 100644 --- a/node/tests/GlideClusterClient.test.ts +++ b/node/tests/GlideClusterClient.test.ts @@ -27,6 +27,7 @@ import { InfoOptions, ListDirection, ProtocolVersion, + ReadFrom, RequestError, Routes, ScoreFilter, @@ -61,45 +62,80 @@ const TIMEOUT = 50000; describe("GlideClusterClient", () => { let testsFailed = 0; let cluster: ValkeyCluster; + let azCluster: ValkeyCluster; let client: GlideClusterClient; + let azClient: GlideClusterClient; beforeAll(async () => { const clusterAddresses = global.CLUSTER_ENDPOINTS; - // Connect to cluster or create a new one based on the parsed addresses - cluster = clusterAddresses - ? await ValkeyCluster.initFromExistingCluster( - true, - parseEndpoints(clusterAddresses), - getServerVersion, - ) - : // setting replicaCount to 1 to facilitate tests routed to replicas - await ValkeyCluster.createCluster(true, 3, 1, getServerVersion); - }, 40000); + + if (clusterAddresses) { + // Initialize current cluster from existing addresses + cluster = await ValkeyCluster.initFromExistingCluster( + true, + parseEndpoints(clusterAddresses), + getServerVersion, + ); + + // Initialize cluster from existing addresses for AzAffinity test + azCluster = await ValkeyCluster.initFromExistingCluster( + true, + parseEndpoints(clusterAddresses), + getServerVersion, + ); + } else { + cluster = await ValkeyCluster.createCluster( + true, + 3, + 1, + getServerVersion, + ); + + azCluster = await ValkeyCluster.createCluster( + true, + 3, + 4, + getServerVersion, + ); + } + }, 120000); afterEach(async () => { await flushAndCloseClient(true, cluster.getAddresses(), client); + await flushAndCloseClient(true, azCluster.getAddresses(), azClient); }); afterAll(async () => { if (testsFailed === 0) { - await cluster.close(); + if (cluster) await cluster.close(); + if (azCluster) await azCluster.close(); } else { - await cluster.close(true); + if (cluster) await cluster.close(true); + if (azCluster) await azCluster.close(true); } }); runBaseTests({ init: async (protocol, configOverrides) => { - const config = getClientConfigurationOption( + const configCurrent = getClientConfigurationOption( cluster.getAddresses(), protocol, configOverrides, ); + client = await GlideClusterClient.createClient(configCurrent); + + const configNew = getClientConfigurationOption( + azCluster.getAddresses(), + protocol, + configOverrides, + ); + azClient = await GlideClusterClient.createClient(configNew); testsFailed += 1; - client = await GlideClusterClient.createClient(config); return { client, + azClient, cluster, + azCluster, }; }, close: (testSucceeded: boolean) => { @@ -1966,4 +2002,400 @@ describe("GlideClusterClient", () => { } }, ); + describe("GlideClusterClient - AZAffinity Read Strategy Test", () => { + async function getNumberOfReplicas( + azClient: GlideClusterClient, + ): Promise { + const replicationInfo = await azClient.customCommand([ + "INFO", + "REPLICATION", + ]); + + if (Array.isArray(replicationInfo)) { + // Handle array response from cluster (CME Mode) + let totalReplicas = 0; + + for (const node of replicationInfo) { + const nodeInfo = node as { + key: string; + value: string | string[] | null; + }; + + if (typeof nodeInfo.value === "string") { + const lines = nodeInfo.value.split(/\r?\n/); + const connectedReplicasLine = lines.find( + (line) => + line.startsWith("connected_slaves:") || + line.startsWith("connected_replicas:"), + ); + + if (connectedReplicasLine) { + const parts = connectedReplicasLine.split(":"); + const numReplicas = parseInt(parts[1], 10); + + if (!isNaN(numReplicas)) { + // Sum up replicas from each primary node + totalReplicas += numReplicas; + } + } + } + } + + if (totalReplicas > 0) { + return totalReplicas; + } + + throw new Error( + "Could not find replica information in any node's response", + ); + } + + throw new Error( + "Unexpected response format from INFO REPLICATION command", + ); + } + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "should route GET commands to all replicas with the same AZ using protocol %p", + async (protocol) => { + const az = "us-east-1a"; + const GET_CALLS_PER_REPLICA = 3; + + let client_for_config_set; + let client_for_testing_az; + + try { + // Stage 1: Configure nodes + client_for_config_set = + await GlideClusterClient.createClient( + getClientConfigurationOption( + azCluster.getAddresses(), + protocol, + ), + ); + + // Skip test if version is below 8.0.0 + if (cluster.checkIfServerVersionLessThan("8.0.0")) { + console.log( + "Skipping test: requires Valkey 8.0.0 or higher", + ); + return; + } + + await client_for_config_set.customCommand([ + "CONFIG", + "RESETSTAT", + ]); + await client_for_config_set.customCommand( + ["CONFIG", "SET", "availability-zone", az], + { route: "allNodes" }, + ); + + // Retrieve the number of replicas dynamically + const n_replicas = await getNumberOfReplicas( + client_for_config_set, + ); + + if (n_replicas === 0) { + throw new Error( + "No replicas found in the cluster. Test requires at least one replica.", + ); + } + + const GET_CALLS = GET_CALLS_PER_REPLICA * n_replicas; + const get_cmdstat = `calls=${GET_CALLS_PER_REPLICA}`; + + // Stage 2: Create AZ affinity client and verify configuration + client_for_testing_az = + await GlideClusterClient.createClient( + getClientConfigurationOption( + azCluster.getAddresses(), + protocol, + { + readFrom: "AZAffinity" as ReadFrom, + clientAz: az, + }, + ), + ); + + const azs = await client_for_testing_az.customCommand( + ["CONFIG", "GET", "availability-zone"], + { route: "allNodes" }, + ); + + if (Array.isArray(azs)) { + const allAZsMatch = azs.every((node) => { + const nodeResponse = node as { + key: string; + value: string | number; + }; + + if (protocol === ProtocolVersion.RESP2) { + // RESP2: Direct array format ["availability-zone", "us-east-1a"] + return ( + Array.isArray(nodeResponse.value) && + nodeResponse.value[1] === az + ); + } else { + // RESP3: Nested object format [{ key: "availability-zone", value: "us-east-1a" }] + return ( + Array.isArray(nodeResponse.value) && + nodeResponse.value[0]?.key === + "availability-zone" && + nodeResponse.value[0]?.value === az + ); + } + }); + expect(allAZsMatch).toBe(true); + } else { + throw new Error( + "Unexpected response format from CONFIG GET command", + ); + } + + // Stage 3: Set test data and perform GET operations + await client_for_testing_az.set("foo", "testvalue"); + + for (let i = 0; i < GET_CALLS; i++) { + await client_for_testing_az.get("foo"); + } + + // Stage 4: Verify GET commands were routed correctly + const info_result = + await client_for_testing_az.customCommand( + ["INFO", "ALL"], // Get both replication and commandstats info + { route: "allNodes" }, + ); + + if (Array.isArray(info_result)) { + const matching_entries_count = info_result.filter( + (node) => { + const nodeInfo = node as { + key: string; + value: string | string[] | null; + }; + const infoStr = + nodeInfo.value?.toString() || ""; + + // Check if this is a replica node AND it has the expected number of GET calls + const isReplicaNode = + infoStr.includes("role:slave") || + infoStr.includes("role:replica"); + + return ( + isReplicaNode && + infoStr.includes(get_cmdstat) + ); + }, + ).length; + + expect(matching_entries_count).toBe(n_replicas); // Should expect 12 as the cluster was created with 3 primary and 4 replicas, totalling 12 replica nodes + } else { + throw new Error( + "Unexpected response format from INFO command", + ); + } + } finally { + // Cleanup + await client_for_config_set?.close(); + await client_for_testing_az?.close(); + } + }, + ); + }); + describe("GlideClusterClient - AZAffinity Routing to 1 replica", () => { + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "should route commands to single replica with AZ using protocol %p", + async (protocol) => { + const az = "us-east-1a"; + const GET_CALLS = 3; + const get_cmdstat = `calls=${GET_CALLS}`; + let client_for_config_set; + let client_for_testing_az; + + try { + // Stage 1: Configure nodes + client_for_config_set = + await GlideClusterClient.createClient( + getClientConfigurationOption( + azCluster.getAddresses(), + protocol, + ), + ); + + // Skip test if version is below 8.0.0 + if (cluster.checkIfServerVersionLessThan("8.0.0")) { + console.log( + "Skipping test: requires Valkey 8.0.0 or higher", + ); + return; + } + + await client_for_config_set.customCommand( + ["CONFIG", "SET", "availability-zone", ""], + { route: "allNodes" }, + ); + + await client_for_config_set.customCommand([ + "CONFIG", + "RESETSTAT", + ]); + + await client_for_config_set.customCommand( + ["CONFIG", "SET", "availability-zone", az], + { route: { type: "replicaSlotId", id: 12182 } }, + ); + + // Stage 2: Create AZ affinity client and verify configuration + client_for_testing_az = + await GlideClusterClient.createClient( + getClientConfigurationOption( + azCluster.getAddresses(), + protocol, + { + readFrom: "AZAffinity", + clientAz: az, + }, + ), + ); + await client_for_testing_az.set("foo", "testvalue"); + + for (let i = 0; i < GET_CALLS; i++) { + await client_for_testing_az.get("foo"); + } + + // Stage 4: Verify GET commands were routed correctly + const info_result = + await client_for_testing_az.customCommand( + ["INFO", "ALL"], + { route: "allNodes" }, + ); + + // Process the info_result to check that only one replica has the GET calls + if (Array.isArray(info_result)) { + // Count the number of nodes where both get_cmdstat and az are present + const matching_entries_count = info_result.filter( + (node) => { + const nodeInfo = node as { + key: string; + value: string | string[] | null; + }; + const infoStr = + nodeInfo.value?.toString() || ""; + return ( + infoStr.includes(get_cmdstat) && + infoStr.includes(`availability_zone:${az}`) + ); + }, + ).length; + + expect(matching_entries_count).toBe(1); + + // Check that only one node has the availability zone set to az + const changed_az_count = info_result.filter((node) => { + const nodeInfo = node as { + key: string; + value: string | string[] | null; + }; + const infoStr = nodeInfo.value?.toString() || ""; + return infoStr.includes(`availability_zone:${az}`); + }).length; + + expect(changed_az_count).toBe(1); + } else { + throw new Error( + "Unexpected response format from INFO command", + ); + } + } finally { + await client_for_config_set?.close(); + await client_for_testing_az?.close(); + } + }, + ); + }); + describe("GlideClusterClient - AZAffinity with Non-existing AZ", () => { + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "should route commands to a replica when AZ does not exist using protocol %p", + async (protocol) => { + const GET_CALLS = 4; + const replica_calls = 1; + const get_cmdstat = `cmdstat_get:calls=${replica_calls}`; + let client_for_testing_az; + + try { + // Skip test if server version is below 8.0.0 + if (azCluster.checkIfServerVersionLessThan("8.0.0")) { + console.log( + "Skipping test: requires Valkey 8.0.0 or higher", + ); + return; + } + + // Create a client configured for AZAffinity with a non-existing AZ + client_for_testing_az = + await GlideClusterClient.createClient( + getClientConfigurationOption( + azCluster.getAddresses(), + protocol, + { + readFrom: "AZAffinity", + clientAz: "non-existing-az", + requestTimeout: 2000, + }, + ), + ); + + // Reset command stats on all nodes + await client_for_testing_az.customCommand( + ["CONFIG", "RESETSTAT"], + { route: "allNodes" }, + ); + + // Issue GET commands + for (let i = 0; i < GET_CALLS; i++) { + await client_for_testing_az.get("foo"); + } + + // Fetch command stats from all nodes + const info_result = + await client_for_testing_az.customCommand( + ["INFO", "COMMANDSTATS"], + { route: "allNodes" }, + ); + + // Inline matching logic + let matchingEntriesCount = 0; + + if ( + typeof info_result === "object" && + info_result !== null + ) { + const nodeResponses = Object.values(info_result); + + for (const response of nodeResponses) { + if ( + response && + typeof response === "object" && + "value" in response && + response.value.includes(get_cmdstat) + ) { + matchingEntriesCount++; + } + } + } else { + throw new Error( + "Unexpected response format from INFO command", + ); + } + + // Validate that only one replica handled the GET calls + expect(matchingEntriesCount).toBe(4); + } finally { + // Cleanup: Close the client after test execution + await client_for_testing_az?.close(); + } + }, + ); + }); });