From d4ced71f210d7a425e83c70d6e9b25ab997cd1c3 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Sat, 21 Dec 2024 06:02:23 -0800 Subject: [PATCH] feat(debugging): implement x-goog-spanner-request-id propagation per request Implements propagation of the x-goog-spanner-request-id that'll be propagated for every call. Once an error has been encountered, that error will have `.requestId` set. Fixes #2200 --- src/batch-transaction.ts | 29 ++++++++++---- src/database.ts | 47 ++++++++++++++++++++++- src/index.ts | 3 ++ src/instance.ts | 1 + src/request_id_header.ts | 81 +++++++++++++++++++++++++++++++++++++++ src/session.ts | 38 ++++++++++++++++-- src/transaction.ts | 25 ++++++++++-- test/gapic_spanner_v1.ts | 8 +++- test/request_id_header.ts | 49 +++++++++++++++++++++++ 9 files changed, 264 insertions(+), 17 deletions(-) create mode 100644 src/request_id_header.ts create mode 100644 test/request_id_header.ts diff --git a/src/batch-transaction.ts b/src/batch-transaction.ts index d182d4429..03c63a4e2 100644 --- a/src/batch-transaction.ts +++ b/src/batch-transaction.ts @@ -146,6 +146,8 @@ class BatchTransaction extends Snapshot { 'BatchTransaction.createQueryPartitions', traceConfig, span => { + const database = this.session.parent as Database; + const nthRequest = database._nextNthRequest(); const headers: {[k: string]: string} = {}; if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); @@ -157,7 +159,11 @@ class BatchTransaction extends Snapshot { method: 'partitionQuery', reqOpts, gaxOpts: query.gaxOptions, - headers: headers, + headers: this.session._metadataWithRequestId( + nthRequest, + 1, + headers + ), }, (err, partitions, resp) => { if (err) { @@ -201,11 +207,16 @@ class BatchTransaction extends Snapshot { transaction: {id: this.id}, }); config.reqOpts = extend({}, query); - config.headers = { - [CLOUD_RESOURCE_HEADER]: (this.session.parent as Database) - .formattedName_, + const database = this.session.parent as Database; + const headers = { + [CLOUD_RESOURCE_HEADER]: database.formattedName_, }; - delete query.partitionOptions; + (config.headers = this.session._metadataWithRequestId( + database._nextNthRequest(), + 1, + headers + )), + delete query.partitionOptions; this.session.request(config, (err, resp) => { if (err) { setSpanError(span, err); @@ -286,14 +297,18 @@ class BatchTransaction extends Snapshot { if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); } - + const database = this.session.parent as Database; this.createPartitions_( { client: 'SpannerClient', method: 'partitionRead', reqOpts, gaxOpts: options.gaxOptions, - headers: headers, + headers: this.session._metadataWithRequestId( + database._nextNthRequest(), + 1, + headers + ), }, (err, partitions, resp) => { if (err) { diff --git a/src/database.ts b/src/database.ts index edccf23c1..9f2f2ce9c 100644 --- a/src/database.ts +++ b/src/database.ts @@ -112,6 +112,13 @@ import { setSpanErrorAndException, traceConfig, } from './instrument'; +import { + AtomicCounter, + X_GOOG_SPANNER_REQUEST_ID_HEADER, + craftRequestId, + newAtomicCounter, +} from './request_id_header'; + export type GetDatabaseRolesCallback = RequestCallback< IDatabaseRole, databaseAdmin.spanner.admin.database.v1.IListDatabaseRolesResponse @@ -350,6 +357,8 @@ class Database extends common.GrpcServiceObject { > | null; _observabilityOptions?: ObservabilityOptions; // TODO: exmaine if we can remove it private _traceConfig: traceConfig; + private _nthRequest: AtomicCounter; + public _clientId: number; constructor( instance: Instance, name: string, @@ -483,7 +492,14 @@ class Database extends common.GrpcServiceObject { Object.assign({}, queryOptions), Database.getEnvironmentQueryOptions() ); + this._nthRequest = newAtomicCounter(0); + this._clientId = 0; + } + + _nextNthRequest(): number { + return this._nthRequest.increment(); } + /** * @typedef {array} SetDatabaseMetadataResponse * @property {object} 0 The {@link Database} metadata. @@ -699,7 +715,11 @@ class Database extends common.GrpcServiceObject { method: 'batchCreateSessions', reqOpts, gaxOpts: options.gaxOptions, - headers: headers, + headers: this._metadataWithRequestId( + this._nextNthRequest(), + 1, + headers + ), }, (err, resp) => { if (err) { @@ -723,6 +743,31 @@ class Database extends common.GrpcServiceObject { }); } + private channelId(): number { + // TODO: Infer channelId from the actual gRPC channel. + return 1; + } + + public _metadataWithRequestId( + nthRequest: number, + attempt: number, + priorMetadata?: {[k: string]: string} + ): {[k: string]: string} { + if (!priorMetadata) { + priorMetadata = {}; + } + const withReqId = { + ...priorMetadata, + }; + withReqId[X_GOOG_SPANNER_REQUEST_ID_HEADER] = craftRequestId( + this._clientId, + this.channelId(), + nthRequest, + attempt + ); + return withReqId; + } + /** * Get a reference to a {@link BatchTransaction} object. * diff --git a/src/index.ts b/src/index.ts index 9281484c9..f4a242569 100644 --- a/src/index.ts +++ b/src/index.ts @@ -86,6 +86,7 @@ import { ObservabilityOptions, ensureInitialContextManagerSet, } from './instrument'; +import {AtomicCounter, nextSpannerClientId} from './request_id_header'; // eslint-disable-next-line @typescript-eslint/no-var-requires const gcpApiConfig = require('./spanner_grpc_config.json'); @@ -248,6 +249,7 @@ class Spanner extends GrpcService { routeToLeaderEnabled = true; directedReadOptions: google.spanner.v1.IDirectedReadOptions | null; _observabilityOptions: ObservabilityOptions | undefined; + _nthClientId: number; /** * Placeholder used to auto populate a column with the commit timestamp. @@ -377,6 +379,7 @@ class Spanner extends GrpcService { this._observabilityOptions?.enableEndToEndTracing ); ensureInitialContextManagerSet(); + this._nthClientId = nextSpannerClientId(); } /** diff --git a/src/instance.ts b/src/instance.ts index 72257b24c..e15c94c5c 100644 --- a/src/instance.ts +++ b/src/instance.ts @@ -980,6 +980,7 @@ class Instance extends common.GrpcServiceObject { if (!this.databases_.has(key!)) { const db = new Database(this, name, poolOptions, queryOptions); db._observabilityOptions = this._observabilityOptions; + db._clientId = (this.parent as Spanner)._nthClientId; this.databases_.set(key!, db); } return this.databases_.get(key!)!; diff --git a/src/request_id_header.ts b/src/request_id_header.ts new file mode 100644 index 000000000..5f660baf8 --- /dev/null +++ b/src/request_id_header.ts @@ -0,0 +1,81 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {randomBytes} from 'crypto'; +const randIdForProcess = randomBytes(8).readBigUint64LE(0).toString(); +const X_GOOG_SPANNER_REQUEST_ID_HEADER = 'x-goog-spanner-request-id'; + +class AtomicCounter { + private backingBuffer: Uint32Array; + + constructor(initialValue?: number) { + this.backingBuffer = new Uint32Array( + new SharedArrayBuffer(Uint32Array.BYTES_PER_ELEMENT) + ); + if (initialValue) { + this.increment(initialValue); + } + } + + public increment(n?: number): number { + if (!n) { + n = 1; + } + Atomics.add(this.backingBuffer, 0, n); + return this.value(); + } + + public value(): number { + return Atomics.load(this.backingBuffer, 0); + } + + public toString(): string { + return `${this.value()}`; + } +} + +function craftRequestId( + nthClientId: number, + channelId: number, + nthRequest: number, + attempt: number +) { + return `1.${randIdForProcess}.${nthClientId}.${channelId}.${nthRequest}.${attempt}`; +} + +const nthClientId = new AtomicCounter(); + +/* + * nextSpannerClientId increments the internal + * counter for created SpannerClients, for use + * with x-goog-spanner-request-id. + */ +function nextSpannerClientId(): number { + nthClientId.increment(1); + return nthClientId.value(); +} + +function newAtomicCounter(n?: number): AtomicCounter { + return new AtomicCounter(n); +} + +export { + AtomicCounter, + X_GOOG_SPANNER_REQUEST_ID_HEADER, + craftRequestId, + nextSpannerClientId, + newAtomicCounter, +}; diff --git a/src/session.ts b/src/session.ts index 32d79d352..d8317693e 100644 --- a/src/session.ts +++ b/src/session.ts @@ -44,7 +44,6 @@ import { import {grpc, CallOptions} from 'google-gax'; import IRequestOptions = google.spanner.v1.IRequestOptions; import {Spanner} from '.'; - export type GetSessionResponse = [Session, r.Response]; /** @@ -317,13 +316,18 @@ export class Session extends common.GrpcServiceObject { const reqOpts = { name: this.formattedName_, }; + const database = this.parent as Database; return this.request( { client: 'SpannerClient', method: 'deleteSession', reqOpts, gaxOpts, - headers: this.commonHeaders_, + headers: database._metadataWithRequestId( + database._nextNthRequest(), + 1, + this.commonHeaders_ + ), }, callback! ); @@ -389,13 +393,18 @@ export class Session extends common.GrpcServiceObject { if (this._getSpanner().routeToLeaderEnabled) { addLeaderAwareRoutingHeader(headers); } + const database = this.parent as Database; return this.request( { client: 'SpannerClient', method: 'getSession', reqOpts, gaxOpts, - headers: headers, + headers: database._metadataWithRequestId( + database._nextNthRequest(), + 1, + headers + ), }, (err, resp) => { if (resp) { @@ -441,17 +450,33 @@ export class Session extends common.GrpcServiceObject { session: this.formattedName_, sql: 'SELECT 1', }; + + const database = this.parent as Database; return this.request( { client: 'SpannerClient', method: 'executeSql', reqOpts, gaxOpts, - headers: this.commonHeaders_, + headers: database._metadataWithRequestId( + database._nextNthRequest(), + 1, + this.commonHeaders_ + ), }, callback! ); } + + public _metadataWithRequestId( + nthRequest: number, + attempt: number, + priorMetadata?: {[k: string]: string} + ): {[k: string]: string} { + const database = this.parent as Database; + return database._metadataWithRequestId(nthRequest, attempt, priorMetadata); + } + /** * Create a PartitionedDml transaction. * @@ -534,6 +559,11 @@ export class Session extends common.GrpcServiceObject { private _getSpanner(): Spanner { return this.parent.parent.parent as Spanner; } + + private channelId(): number { + // TODO: Infer channelId from the actual gRPC channel. + return 1; + } } /*! Developer Documentation diff --git a/src/transaction.ts b/src/transaction.ts index 725ec3235..494191c98 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -446,6 +446,7 @@ export class Snapshot extends EventEmitter { opts: this._observabilityOptions, dbName: this._dbName!, }; + const database = this.session.parent as Database; return startTrace('Snapshot.begin', traceConfig, span => { span.addEvent('Begin Transaction'); @@ -455,7 +456,11 @@ export class Snapshot extends EventEmitter { method: 'beginTransaction', reqOpts, gaxOpts, - headers: headers, + headers: this.session._metadataWithRequestId( + database._nextNthRequest(), + 1, + headers + ), }, ( err: null | grpc.ServiceError, @@ -712,8 +717,12 @@ export class Snapshot extends EventEmitter { opts: this._observabilityOptions, dbName: this._dbName!, }; + return startTrace('Snapshot.createReadStream', traceConfig, span => { let attempt = 0; + const database = this.session.parent as Database; + const nthRequest = database._nextNthRequest(); + const makeRequest = (resumeToken?: ResumeToken): Readable => { if (this.id && transaction.begin) { delete transaction.begin; @@ -740,7 +749,11 @@ export class Snapshot extends EventEmitter { method: 'streamingRead', reqOpts: Object.assign({}, reqOpts, {resumeToken}), gaxOpts: gaxOptions, - headers: headers, + headers: this.session._metadataWithRequestId( + nthRequest, + attempt, + headers + ), }); }; @@ -1298,6 +1311,8 @@ export class Snapshot extends EventEmitter { }; return startTrace('Snapshot.runStream', traceConfig, span => { let attempt = 0; + const database = this.session.parent as Database; + const nthRequest = database._nextNthRequest(); const makeRequest = (resumeToken?: ResumeToken): Readable => { attempt++; @@ -1331,7 +1346,11 @@ export class Snapshot extends EventEmitter { method: 'executeStreamingSql', reqOpts: Object.assign({}, reqOpts, {resumeToken}), gaxOpts: gaxOptions, - headers: headers, + headers: this.session._metadataWithRequestId( + nthRequest, + attempt, + headers + ), }); }; diff --git a/test/gapic_spanner_v1.ts b/test/gapic_spanner_v1.ts index b98138a1a..d2220d051 100644 --- a/test/gapic_spanner_v1.ts +++ b/test/gapic_spanner_v1.ts @@ -481,10 +481,14 @@ describe('v1.SpannerClient', () => { client.innerApiCalls.batchCreateSessions as SinonStub ).getCall(0).args[0]; assert.deepStrictEqual(actualRequest, request); - const actualHeaderRequestParams = ( + const actualHeaders = ( client.innerApiCalls.batchCreateSessions as SinonStub - ).getCall(0).args[1].otherArgs.headers['x-goog-request-params']; + ).getCall(0).args[1].otherArgs.headers; + const actualHeaderRequestParams = actualHeaders['x-goog-request-params']; assert(actualHeaderRequestParams.includes(expectedHeaderRequestParams)); + const actualRequestID = actualHeaders['x-goog-spanner-request-id']; + console.log('headers', actualHeaders); + assert.deepStrictEqual(actualRequestID, 'foo'); }); it('invokes batchCreateSessions without error using callback', async () => { diff --git a/test/request_id_header.ts b/test/request_id_header.ts new file mode 100644 index 000000000..31d28621e --- /dev/null +++ b/test/request_id_header.ts @@ -0,0 +1,49 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* eslint-disable prefer-rest-params */ +import {AtomicCounter} from '../src/request_id_header'; + +describe('AtomicCounter', () => { + it('Constructor with initialValue', done => { + const ac0 = new AtomicCounter(); + assert.strictEqual(ac0.value(), 0); + assert.strictEqual( + ac0.increment(2), + 2, + 'increment should return the added value' + ); + assert.strictEqual( + ac0.value(), + 2, + 'increment should have modified the value' + ); + + const ac1 = new AtomicCounter(1); + assert.strictEqual(ac1.value(), 1); + assert.strictEqual( + ac0.increment(1 << 32), + (1 << 32) + 1, + 'increment should return the added value' + ); + assert.strictEqual( + ac0.value(), + (1 << 32) + 1, + 'increment should have modified the value' + ); + done(); + }); +});