From dc5f42c0124e524fefbcb49a3cb510d20f574091 Mon Sep 17 00:00:00 2001 From: Alka Trivedi Date: Tue, 21 Jan 2025 17:00:33 +0530 Subject: [PATCH] refactor --- .../system-test-multiplexed-session.cfg | 2 +- .kokoro/system-test.sh | 6 - .kokoro/trampoline_v2.sh | 1 + benchmark/benchmarking-multiplexed-session.js | 205 ++++++++++++++++++ owlbot.py | 2 +- package.json | 1 + src/database.ts | 1 + test/spanner.ts | 1 - 8 files changed, 210 insertions(+), 9 deletions(-) create mode 100644 benchmark/benchmarking-multiplexed-session.js diff --git a/.kokoro/presubmit/node14/system-test-multiplexed-session.cfg b/.kokoro/presubmit/node14/system-test-multiplexed-session.cfg index 8bf1160cd..aeeace4e8 100644 --- a/.kokoro/presubmit/node14/system-test-multiplexed-session.cfg +++ b/.kokoro/presubmit/node14/system-test-multiplexed-session.cfg @@ -12,6 +12,6 @@ env_vars: { } env_vars: { - key: "GOOGLE_CLOUD_SPANNER_ENABLE_MULTIPLEXED_SESSIONS" + key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS" value: "true" } \ No newline at end of file diff --git a/.kokoro/system-test.sh b/.kokoro/system-test.sh index bb20a4220..a90d5cfec 100755 --- a/.kokoro/system-test.sh +++ b/.kokoro/system-test.sh @@ -45,12 +45,6 @@ if [[ $KOKORO_BUILD_ARTIFACTS_SUBDIR = *"continuous"* ]] || [[ $KOKORO_BUILD_ART trap cleanup EXIT HUP fi -# If tests are running with enabled multiplexed session, configure env -# GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS -if [[ $GOOGLE_CLOUD_SPANNER_ENABLE_MULTIPLEXED_SESSIONS = *"true"* ]]; then - export GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS=true -fi - npm run system-test # codecov combines coverage across integration and unit tests. Include diff --git a/.kokoro/trampoline_v2.sh b/.kokoro/trampoline_v2.sh index 5d6cfcca5..23b5f7893 100755 --- a/.kokoro/trampoline_v2.sh +++ b/.kokoro/trampoline_v2.sh @@ -165,6 +165,7 @@ if [[ -n "${KOKORO_BUILD_ID:-}" ]]; then # For flakybot "KOKORO_GITHUB_COMMIT_URL" "KOKORO_GITHUB_PULL_REQUEST_URL" + "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS" ) elif [[ "${TRAVIS:-}" == "true" ]]; then RUNNING_IN_CI="true" diff --git a/benchmark/benchmarking-multiplexed-session.js b/benchmark/benchmarking-multiplexed-session.js new file mode 100644 index 000000000..fe112b520 --- /dev/null +++ b/benchmark/benchmarking-multiplexed-session.js @@ -0,0 +1,205 @@ +/*! + * Copyright 2025 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +// Setup OpenTelemetry and the trace exporter. +const { + NodeTracerProvider, + TraceIdRatioBasedSampler, +} = require('@opentelemetry/sdk-trace-node'); +const {BatchSpanProcessor} = require('@opentelemetry/sdk-trace-base'); + +// Create the Google Cloud Trace exporter for OpenTelemetry. +const { + TraceExporter, +} = require('@google-cloud/opentelemetry-cloud-trace-exporter'); +const exporter = new TraceExporter(); + +// Create the OpenTelemetry tracerProvider that the exporter shall be attached to. +const provider = new NodeTracerProvider({ + // Modify the following line to adjust the sampling rate. + // It is currently set to 1.0, meaning all requests will be traced. + sampler: new TraceIdRatioBasedSampler(1.0), +}); +provider.addSpanProcessor(new BatchSpanProcessor(exporter)); + +// Set global propagator to propogate the trace context for end to end tracing. +const {propagation} = require('@opentelemetry/api'); +const {W3CTraceContextPropagator} = require('@opentelemetry/core'); +propagation.setGlobalPropagator(new W3CTraceContextPropagator()); + +const thread_execution_times = []; +const transaction_times = []; +async function main( + instanceId, + databaseId, + projectId, + method, + multiplexedEnabled, + numThreads, + numQueries +) { + multiplexedEnabled === 'true' + ? (process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = true) + : (process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS = false); + + const {Spanner} = require('../build/src'); + const {performance} = require('perf_hooks'); + const spanner = new Spanner({ + projectId: projectId, + apiEndpoint: 'staging-wrenchworks.sandbox.googleapis.com', + observabilityOptions: { + tracerProvider: provider, + enableExtendedTracing: true, + enableEndToEndTracing: true, + }, + }); + + // Gets a reference to a Cloud Spanner instance and database + const instance = spanner.instance(instanceId); + const database = instance.database(databaseId); + + function generateQuery() { + const id = Math.floor(Math.random() * 10) + 1; + const query = { + sql: 'SELECT SingerId from Singers WHERE SingerId = @id', + params: {id: id}, + }; + return query; + } + // warm up query + // for (let i = 0; i < 10; i++) { + // await database.run(generateQuery()); + // } + + async function singleUseTxn() { + const startThreadTime = performance.now(); + + for (let i = 0; i < numQueries; i++) { + const startTime = performance.now(); + await database.run(generateQuery()); + setTimeout(() => {}, 1000); + const operationTime = performance.now() - startTime; + transaction_times.push(operationTime); + } + + thread_execution_times.push( + (performance.now() - startThreadTime).toFixed(2) + ); + } + + async function multiUseTxn() { + const startThreadTime = performance.now(); + + for (let i = 0; i < numQueries; i++) { + const startTime = performance.now(); + const [txn] = await database.getSnapshot(); + await txn.run(generateQuery()); + await txn.run(generateQuery()); + await txn.run(generateQuery()); + await txn.run(generateQuery()); + txn.end(); + const operationTime = (performance.now() - startTime).toFixed(2); + transaction_times.push(operationTime); + } + + thread_execution_times.push( + (performance.now() - startThreadTime).toFixed(2) + ); + } + + function calculatePercentiles(latencies) { + // Step 1: Sort the array + const sortedLatencies = latencies.slice().sort((a, b) => a - b); + + // Step 2: Calculate average + const sum = sortedLatencies.reduce((acc, num) => acc + parseFloat(num), 0); + const average = (sum / sortedLatencies.length).toFixed(2); + + // Step 3: Calculate p50 (50th percentile) + const p50Index = Math.floor(0.5 * sortedLatencies.length); + const p50Latency = parseFloat(sortedLatencies[p50Index]).toFixed(2); + + // Step 4: Calculate p90 (90th percentile) + const p90Index = Math.floor(0.9 * sortedLatencies.length); + const p90Latency = parseFloat(sortedLatencies[p90Index]).toFixed(2); + + // Step 5: Calculate p99 (99th percentile) + const p99Index = Math.floor(0.99 * sortedLatencies.length); + const p99Latency = parseFloat(sortedLatencies[p99Index]).toFixed(2); + + return { + avg: average, + p50: p50Latency, + p90: p90Latency, + p99: p99Latency, + }; + } + + async function runConcurrently() { + const promises = []; + for (let i = 0; i < numThreads; i++) { + method === 'singleUseTxn' + ? promises.push(singleUseTxn()) + : promises.push(multiUseTxn()); + } + await Promise.all(promises); + console.log('excution time taken by threads are: '); + thread_execution_times.forEach(executionTime => { + console.log(executionTime); + }); + } + + try { + await runConcurrently(); + const percentiles = calculatePercentiles(transaction_times); + console.log(`average Latency: ${percentiles.avg}`); + console.log(`p50 Latency: ${percentiles.p50}`); + console.log(`p90 Latency: ${percentiles.p90}`); + console.log(`p99 Latency: ${percentiles.p99}`); + } catch (error) { + console.log('error: ', error); + } + + provider.forceFlush(); + + // This sleep gives ample time for the trace + // spans to be exported to Google Cloud Trace. + await new Promise(resolve => { + setTimeout(() => { + resolve(); + }, 8800); + }); + + // await runConcurrently() + // .then(() => { + // const percentiles = calculatePercentiles(transaction_times); + // console.log(`average Latency: ${percentiles.avg}`); + // console.log(`p50 Latency: ${percentiles.p50}`); + // console.log(`p90 Latency: ${percentiles.p90}`); + // console.log(`p99 Latency: ${percentiles.p99}`); + // }) + // .catch(error => { + // console.log('error: ', error); + // }); +} + +process.on('unhandledRejection', err => { + console.error(err.message); + process.exitCode = 1; +}); +main(...process.argv.slice(2)); diff --git a/owlbot.py b/owlbot.py index cf7c6d663..2a9b067a1 100644 --- a/owlbot.py +++ b/owlbot.py @@ -64,7 +64,7 @@ common_templates = gcp.CommonTemplates() templates = common_templates.node_library(source_location='build/src') -s.copy(templates, excludes=[".kokoro/samples-test.sh",".github/release-trigger.yml"]) +s.copy(templates, excludes=[".kokoro/samples-test.sh", ".kokoro/trampoline_v2.sh", ".github/release-trigger.yml"]) node.postprocess_gapic_library_hermetic() diff --git a/package.json b/package.json index a633f96b1..b3a63e552 100644 --- a/package.json +++ b/package.json @@ -87,6 +87,7 @@ "through2": "^4.0.0" }, "devDependencies": { + "@google-cloud/opentelemetry-cloud-trace-exporter": "^2.4.1", "@opentelemetry/sdk-trace-base": "^1.26.0", "@opentelemetry/sdk-trace-node": "^1.26.0", "@types/concat-stream": "^2.0.0", diff --git a/src/database.ts b/src/database.ts index 6f5ec17cb..535797e42 100644 --- a/src/database.ts +++ b/src/database.ts @@ -1011,6 +1011,7 @@ class Database extends common.GrpcServiceObject { } const session = this.session(resp!.name!); session.metadata = resp; + session._observabilityOptions = this._traceConfig!.opts; span.end(); callback(null, session, resp!); } diff --git a/test/spanner.ts b/test/spanner.ts index ec2398c9b..3dfb7cead 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -68,7 +68,6 @@ import TypeCode = google.spanner.v1.TypeCode; import NullValue = google.protobuf.NullValue; import {SessionFactory} from '../src/session-factory'; import {MultiplexedSession} from '../src/multiplexed-session'; -import {BatchTransaction} from '../src/batch-transaction'; const { AlwaysOnSampler,