diff --git a/packages/doc/docs/connectors.mdx b/packages/doc/docs/connectors.mdx index 766cc8a66..f6e0c7566 100644 --- a/packages/doc/docs/connectors.mdx +++ b/packages/doc/docs/connectors.mdx @@ -7,7 +7,7 @@ We support the following data warehouses to connect with, you can choose multipl | [PostgreSQL](./connectors/postgresql) | ✅ Yes | ✅ Yes | ❌ No | | [DuckDB](./connectors/duckdb) | ✅ Yes | ✅ Yes | ❌ No | | [Snowflake](./connectors/snowflake) | ✅ Yes | ✅ Yes | ❌ No | -| BigQuery | ✅ Yes | ✅ Yes | ❌ No | +| [BigQuery]](./connectors/bigquery) | ✅ Yes | ✅ Yes | ❌ No | \* Fetching rows only when we need them, it has better performance with large query results. diff --git a/packages/doc/docs/connectors/bigquery.mdx b/packages/doc/docs/connectors/bigquery.mdx index 205b5c075..04804a381 100644 --- a/packages/doc/docs/connectors/bigquery.mdx +++ b/packages/doc/docs/connectors/bigquery.mdx @@ -27,44 +27,52 @@ Connect with your bigquery servers via the official [Node.js Driver](https://clo 3. Create a new profile in `profiles.yaml` or in your profile files. For example: :::info You can choose one from `keyFilename` or `credentials` to use. - For details, please refer to [here](https://cloud.google.com/docs/authentication#service-accounts) + + Your service account must have the following permissions to successfully execute queries. + + - BigQuery Data Viewer + - BigQuery Job User + + > + + For details, please refer to [here](https://cloud.google.com/docs/authentication#service-accounts). ::: -wish keyFilename: - -```yaml -- name: bq # profile name - type: bq - connection: - location: '' - projectId: 'your-project-id' - keyFilename: '/path/to/keyfile.json' - allow: '*' -``` - -wish credential: - -```yaml -- name: bq # profile name - type: bq - connection: - location: US - projectId: 'your-project-id' - credential: - client_email: vulcan@projectId.iam.gserviceaccount.com - private_key: '-----BEGIN PRIVATE KEY----- XXXXX -----END PRIVATE KEY-----\n' - allow: '*' -``` + with `keyFilename`: + + ```yaml + - name: bq # profile name + type: bq + connection: + location: US + projectId: 'your-project-id' + keyFilename: '/path/to/keyfile.json' + allow: '*' + ``` + + with `credential`: + + ```yaml + - name: bq # profile name + type: bq + connection: + location: US + projectId: 'your-project-id' + credential: + client_email: vulcan@projectId.iam.gserviceaccount.com + private_key: '-----BEGIN PRIVATE KEY----- XXXXX -----END PRIVATE KEY-----\n' + allow: '*' + ``` ## Connection Configuration Please check [Interface BigQueryOptions](https://cloud.google.com/nodejs/docs/reference/bigquery/latest/bigquery/bigqueryoptions) and [Google BigQuery: Node.js Client](https://github.com/googleapis/nodejs-bigquery/blob/main/src/bigquery.ts#L173-L244) for further information. -| Name | Required | Default | Description | -| ------------------------ | -------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| location | N | US | Location must match that of the dataset(s) referenced in the query. | -| projectId | N | | The project ID from the Google Developer's Console, e.g. 'grape-spaceship-123'. We will also check the environment variable `GCLOUD_PROJECT` for your project ID. If your app is running in an environment which [supports](https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application Application Default Credentials), your project ID will be detected. | -| keyFilename | N | | Full path to the a .json, .pem, or .p12 key downloaded from the Google Developers Console. If you provide a path to a JSON file, the `projectId` option above is not necessary. NOTE: .pem and .p12 require you to specify the `email` option as well. | -| credentials | N | | Credentials object. | -| credentials.client_email | N | | Your service account. | -| credentials.private_key | N | | Your service account's private key. | +| Name | Required | Default | Description | +| ------------------------ | -------- | ------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| location | N | US | Location must match that of the dataset(s) referenced in the query. | +| projectId | N | | The project ID from the Google Developer's Console, e.g. 'grape-spaceship-123'. We will also check the environment variable `GCLOUD_PROJECT` for your project ID. If your app is running in an environment which [supports](https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application) Application Default Credentials), your project ID will be detected. | +| keyFilename | N | | Full path to the a .json, .pem, or .p12 key downloaded from the Google Developers Console. If you provide a path to a JSON file, the `projectId` option above is not necessary. NOTE: .pem and .p12 require you to specify the `email` option as well. | +| credentials | N | | Credentials object. | +| credentials.client_email | N | | Your service account. | +| credentials.private_key | N | | Your service account's private key. | diff --git a/packages/extension-driver-bq/README.md b/packages/extension-driver-bq/README.md index c51f7cac4..7c680db4e 100644 --- a/packages/extension-driver-bq/README.md +++ b/packages/extension-driver-bq/README.md @@ -19,7 +19,7 @@ 3. Create a new profile in `profiles.yaml` or in your profiles' paths. -> ⚠️ Your service account must have the following permissions to successfully execute queries... +> ⚠️ Your service account must have the following permissions to successfully execute queries. > > - BigQuery Data Viewer > - BigQuery Job User @@ -32,7 +32,7 @@ location: US # Optional: The max rows we should fetch once. chunkSize: 100 - # The project ID from the Google Developer's Console, e.g. 'grape-spaceship-123'. We will also check the environment variable `GCLOUD_PROJECT` for your project ID. If your app is running in an environment which [supports](https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application Application Default Credentials), your project ID will be detected. + # The project ID from the Google Developer's Console, e.g. 'grape-spaceship-123'. We will also check the environment variable `GCLOUD_PROJECT` for your project ID. If your app is running in an environment which [supports](https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application) Application Default Credentials), your project ID will be detected. projectId: 'your-project-id' # Full path to the a .json, .pem, or .p12 key downloaded from the Google Developers Console. If you provide a path to a JSON file, the `projectId` option above is not necessary. NOTE: .pem and .p12 require you to specify the `email` option as well. keyFilename: '/path/to/keyfile.json' diff --git a/packages/extension-driver-bq/package.json b/packages/extension-driver-bq/package.json index f1827a641..e07ce0723 100644 --- a/packages/extension-driver-bq/package.json +++ b/packages/extension-driver-bq/package.json @@ -15,8 +15,8 @@ "data-warehouse", "data-lake", "api-builder", - "postgres", - "pg" + "bigquery", + "bq" ], "repository": { "type": "git", diff --git a/packages/extension-driver-bq/project.json b/packages/extension-driver-bq/project.json index 7c974c25e..004d6fb64 100644 --- a/packages/extension-driver-bq/project.json +++ b/packages/extension-driver-bq/project.json @@ -3,14 +3,33 @@ "sourceRoot": "packages/extension-driver-bq/src", "targets": { "build": { + "executor": "@nrwl/js:tsc", + "options": { + "command": "yarn ts-node ./tools/scripts/replaceAlias.ts extension-driver-bq" + }, + "dependsOn": [ + { + "projects": "self", + "target": "tsc" + } + ] + }, + "tsc": { "executor": "@nrwl/js:tsc", "outputs": ["{options.outputPath}"], "options": { "outputPath": "dist/packages/extension-driver-bq", "main": "packages/extension-driver-bq/src/index.ts", "tsConfig": "packages/extension-driver-bq/tsconfig.lib.json", - "assets": ["packages/extension-driver-bq/*.md"] - } + "assets": ["packages/extension-driver-bq/*.md"], + "buildableProjectDepsInPackageJsonType": "dependencies" + }, + "dependsOn": [ + { + "projects": "dependencies", + "target": "build" + } + ] }, "publish": { "executor": "@nrwl/workspace:run-commands", diff --git a/packages/extension-driver-bq/src/lib/bqDataSource.ts b/packages/extension-driver-bq/src/lib/bqDataSource.ts index 60ac4a41b..ca0e43c7c 100644 --- a/packages/extension-driver-bq/src/lib/bqDataSource.ts +++ b/packages/extension-driver-bq/src/lib/bqDataSource.ts @@ -26,13 +26,14 @@ export class BQDataSource extends DataSource { const profiles = this.getProfiles().values(); for (const profile of profiles) { this.logger.debug( - `Initializing profile: ${profile.name} using pg driver` + `Initializing profile: ${profile.name} using bq driver` ); const bigqueryClient = new BigQuery(profile.connection); // https://cloud.google.com/nodejs/docs/reference/bigquery/latest this.bqMapping.set(profile.name, { bq: bigqueryClient, + options: profile.connection, }); // Testing connection @@ -51,9 +52,7 @@ export class BQDataSource extends DataSource { throw new InternalError(`Profile instance ${profileName} not found`); } const { bq: client, options } = this.bqMapping.get(profileName)!; - this.logger.debug(`Acquiring connection from ${profileName}`); - origin; const params: Record = {}; bindParams.forEach((value, key) => { params[key.replace('@', '')] = value; @@ -70,7 +69,6 @@ export class BQDataSource extends DataSource { const [job] = await client.createQueryJob(queryOptions); - // All promises MUST fulfilled in this function or we are not able to release the connection when error occurred return await this.getResultFromQueryJob(job, options); } catch (e: any) { this.logger.debug( @@ -89,8 +87,8 @@ export class BQDataSource extends DataSource { options?: BQOptions ): Promise { const { chunkSize = 100 } = options || {}; - const jobDataRead = this.jobDataRead.bind(this); - const firstChunk = await jobDataRead(queryJob, chunkSize); + const fetchJobResult = this.fetchJobResult.bind(this); + const firstChunk = await fetchJobResult(queryJob, chunkSize); // save first chunk in buffer for incoming requests let bufferedRows = [...firstChunk.rows]; @@ -101,7 +99,7 @@ export class BQDataSource extends DataSource { if (bufferReadIndex >= bufferedRows.length) { if (nextQuery == null) return null; - const fetchData = await jobDataRead(queryJob, chunkSize, nextQuery); + const fetchData = await fetchJobResult(queryJob, chunkSize, nextQuery); bufferedRows = fetchData.rows; nextQuery = fetchData.nextQuery; bufferReadIndex = 0; @@ -122,10 +120,6 @@ export class BQDataSource extends DataSource { this.destroy(error); }); }, - destroy(error: Error | null, cb: (error: Error | null) => void) { - // Send done event to notify upstream to release the connection. - cb(error); - }, // automatically destroy() the stream when it emits 'finish' or errors. Node > 10.16 autoDestroy: true, }); @@ -141,7 +135,7 @@ export class BQDataSource extends DataSource { }; } - public async jobDataRead( + public async fetchJobResult( queryJob: Job, chunkSize: number, nextQuery?: Query | null | undefined @@ -151,7 +145,7 @@ export class BQDataSource extends DataSource { nextQuery: Query | null | undefined; apiResponse: bigquery.IGetQueryResultsResponse | null | undefined; }>((resolve, reject) => { - return queryJob.getQueryResults( + queryJob.getQueryResults( nextQuery || { maxResults: chunkSize }, (err, rows, nextQuery, apiResponse) => { if (err) { diff --git a/packages/extension-driver-bq/test/bqDataSource.spec.ts b/packages/extension-driver-bq/test/bqDataSource.spec.ts index d3942bacb..dca6bd5ef 100644 --- a/packages/extension-driver-bq/test/bqDataSource.spec.ts +++ b/packages/extension-driver-bq/test/bqDataSource.spec.ts @@ -1,13 +1,10 @@ import { BQDataSource } from '../src'; import { BQflakeServer } from './bqServer'; import { streamToArray } from '@vulcan-sql/core'; -import { Writable } from 'stream'; const bigQuery = new BQflakeServer(); let dataSource: BQDataSource; -const bqTable = `\`cannerflow-286003.bq_testing_tpch.orders\``; - it('Data source should be activate without any error when all profiles are valid', async () => { // Arrange dataSource = new BQDataSource({}, '', [bigQuery.getProfile('profile1')]); @@ -16,10 +13,9 @@ it('Data source should be activate without any error when all profiles are valid await expect(dataSource.activate()).resolves.not.toThrow(); }); -it('Data source should throw error when activating if any profile is invalid', async () => { +it('Data source should throw error when activating any profile which is invalid', async () => { // Arrange const invalidProfile = bigQuery.getProfile('profile1'); - // invalidProfile.connection.projectId = 'invalid'; invalidProfile.connection.credentials = {}; dataSource = new BQDataSource({}, '', [ bigQuery.getProfile('profile1'), @@ -36,7 +32,7 @@ it('Data source should return correct rows with 2 chunks', async () => { await dataSource.activate(); // Act const { getData } = await dataSource.execute({ - statement: `select * from ${bqTable} limit 193`, + statement: `SELECT num FROM UNNEST(GENERATE_ARRAY(1, 193)) AS num`, bindParams: new Map(), profileName: 'profile1', operations: {} as any, @@ -52,7 +48,7 @@ it('Data source should return correct rows with 1 chunk', async () => { await dataSource.activate(); // Act const { getData } = await dataSource.execute({ - statement: `select * from ${bqTable} limit 12`, + statement: `SELECT num FROM UNNEST(GENERATE_ARRAY(1, 20)) AS num LIMIT 12`, bindParams: new Map(), profileName: 'profile1', operations: {} as any, @@ -68,7 +64,7 @@ it('Data source should return empty data with no row', async () => { await dataSource.activate(); // Act const { getData } = await dataSource.execute({ - statement: `select * from ${bqTable} limit 0`, + statement: `SELECT num FROM UNNEST(GENERATE_ARRAY(1, 10)) AS num LIMIT 0`, bindParams: new Map(), profileName: 'profile1', operations: {} as any, @@ -78,57 +74,6 @@ it('Data source should return empty data with no row', async () => { expect(rows.length).toBe(0); }, 30000); -it('Data source should release the connection when finished no matter success or not', async () => { - // Arrange - dataSource = new BQDataSource({}, '', [bigQuery.getProfile('profile1')]); - await dataSource.activate(); - - // Act - // send parallel queries to test pool leak - const result = await Promise.all( - [ - async () => { - const { getData } = await dataSource.execute({ - statement: `select * from ${bqTable} limit 1`, - bindParams: new Map(), - profileName: 'profile1', - operations: {} as any, - }); - return await streamToArray(getData()); - }, - async () => { - try { - const { getData } = await dataSource.execute({ - statement: 'wrong sql', - bindParams: new Map(), - profileName: 'profile1', - operations: {} as any, - }); - await streamToArray(getData()); - return [{}]; // fake data - } catch { - // ignore error - return []; - } - }, - async () => { - const { getData } = await dataSource.execute({ - statement: `select * from ${bqTable} limit 1`, - bindParams: new Map(), - profileName: 'profile1', - operations: {} as any, - }); - return await streamToArray(getData()); - }, - ].map((task) => task()) - ); - - // Assert - expect(result[0].length).toBe(1); - expect(result[1].length).toBe(0); - expect(result[2].length).toBe(1); -}, 30000); - it('Data source should work with prepare statements', async () => { // Arrange dataSource = new BQDataSource({}, '', [bigQuery.getProfile('profile1')]); @@ -167,7 +112,7 @@ it('Data source should return correct column types', async () => { await dataSource.activate(); // Act const { getColumns, getData } = await dataSource.execute({ - statement: `select * from ${bqTable} limit 0`, + statement: `SELECT CAST(1 as bigint) as a, true as b`, bindParams: new Map(), profileName: 'profile1', operations: {} as any, @@ -178,39 +123,6 @@ it('Data source should return correct column types', async () => { data.destroy(); // Assert - expect(column[0]).toEqual({ name: 'orderkey', type: 'number' }); - expect(column[2]).toEqual({ name: 'orderstatus', type: 'string' }); -}, 30000); - -it('Data source should release connection when readable stream is destroyed', async () => { - // Arrange - dataSource = new BQDataSource({}, '', [bigQuery.getProfile('profile1')]); - await dataSource.activate(); - // Act - const { getData } = await dataSource.execute({ - statement: `select * from ${bqTable} limit 100`, - bindParams: new Map(), - profileName: 'profile1', - operations: {} as any, - }); - const readStream = getData(); - const rows: any[] = []; - let resolve: any; - const waitForStream = () => new Promise((res) => (resolve = res)); - const writeStream = new Writable({ - write(chunk, _, cb) { - rows.push(chunk); - // After read 5 records, destroy the upstream - if (rows.length === 5) { - readStream.destroy(); - resolve(); - } else cb(); - }, - objectMode: true, - }); - readStream.pipe(writeStream); - await waitForStream(); - // Assert - expect(rows.length).toBe(5); - // afterEach hook will timeout if any leak occurred. + expect(column[0]).toEqual({ name: 'a', type: 'number' }); + expect(column[1]).toEqual({ name: 'b', type: 'boolean' }); }, 30000); diff --git a/packages/extension-driver-bq/test/bqServer.ts b/packages/extension-driver-bq/test/bqServer.ts index dfd178f67..b6efaafdc 100644 --- a/packages/extension-driver-bq/test/bqServer.ts +++ b/packages/extension-driver-bq/test/bqServer.ts @@ -11,8 +11,8 @@ export class BQflakeServer { return { name, type: 'bq', - location: process.env['BQ_LOCATION'], connection: { + location: process.env['BQ_LOCATION'], projectId: process.env['BQ_PROJECT_ID'], credentials: { client_email: process.env['BQ_CLIENT_EMAIL'],