Skip to content

Commit

Permalink
Basic support for Neptune. (Will need a pool for 1 tnx per connection…
Browse files Browse the repository at this point in the history
… limitation).
  • Loading branch information
Jasper Blues authored and Jasper Blues committed Jan 31, 2025
1 parent d494e52 commit 2e83e59
Show file tree
Hide file tree
Showing 10 changed files with 12,340 additions and 7,362 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ _Please report any broken links. Even better, a PR is very welcome! 🙏_

Drivine is a graph database client for Node.js and TypeScript. It was created with the following design goals:

* Support multiple graph databases (simultaneously, if you wish). Currently [AgensGraph](https://bitnine.net/agensgraph/) and
[Neo4j](https://neo4j.com/neo4j-graph-database/) (or other BOLT compatible graph DBs).
* Support multiple graph databases (simultaneously, if you wish). Currently, [AgensGraph](https://bitnine.net/agensgraph/), [Amazon Neptune](https://aws.amazon.com/neptune/) and
[Neo4j](https://neo4j.com/neo4j-graph-database/) (or other BOLT compatible graph DBs) are supported. There is a branch for [FalkorDB](https://www.falkordb.com/) that requires support for transactions to be added.
* **Scale to hundreds and thousands of transactions per second, without compromising architectural integrity.**

---------------------------------------

With regards to the second point on scaleability, let's break that down into component facets.
In regard to the second point on scalability, let's break that down into component facets.

## Features

Expand Down
19,494 changes: 12,150 additions & 7,344 deletions package-lock.json

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@liberation-data/drivine",
"version": "2.7.0",
"version": "2.7.1",
"description": "Best and fastest graph database client for TypeScript / Node.js. Provides a level of abstraction for building highly scalable applications, without compromising architectural integrity",
"author": "Jasper Blues | Liberation Data",
"license": "LGPL-3.0-or-later or Apache-2.0",
Expand Down Expand Up @@ -59,11 +59,16 @@
"@nestjs/platform-express": ">= 6 <= 10",
"class-transformer": ">= ^0.5.1",
"class-validator": ">= 0.14.0",
"neo4j-driver": ">= ^5.4.0",
"neo4j-driver": "^5.27.0",
"reflect-metadata": ">= ^0.1.13"
},
"dependencies": {
"@aws-crypto/sha256-js": "^5.2.0",
"@aws-sdk/credential-provider-node": "^3.738.0",
"@aws-sdk/protocol-http": "^3.370.0",
"@aws-sdk/signature-v4": "^3.370.0",
"async-mutex": "^0.4.0",
"aws-sdk": "^2.1692.0",
"cli-color": "^2.0.3",
"dot-prop": "^8.0.2",
"dotenv": "^16.3.1",
Expand Down
91 changes: 91 additions & 0 deletions src/boltTest.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import neo4j from "neo4j-driver";
import { HttpRequest } from "@aws-sdk/protocol-http";
import { defaultProvider } from "@aws-sdk/credential-provider-node";
import { SignatureV4 } from "@aws-sdk/signature-v4";
import crypto from "@aws-crypto/sha256-js";
const { Sha256 } = crypto;
import assert from "node:assert";

const region = "ap-southeast-2";
const serviceName = "neptune-db";
const host = "db-neptune-2-instance-1.cokvzz862p37.ap-southeast-2.neptune.amazonaws.com";
const port = 8182;
const protocol = "bolt";
const hostPort = host + ":" + port;
const url = protocol + "://" + hostPort;
const createQuery = "CREATE (n:Greeting {message: 'Hello'}) RETURN ID(n)";
const readQuery = "MATCH(n:Greeting) WHERE ID(n) = $id RETURN n.message";

async function signedHeader() {
const req = new HttpRequest({
method: "GET",
protocol: protocol,
hostname: host,
port: port,
// Comment out the following line if you're using an engine version older than 1.2.0.0
path: "/opencypher",
headers: {
host: hostPort
}
});

const signer = new SignatureV4({
credentials: defaultProvider(),
region: region,
service: serviceName,
sha256: Sha256
});

return signer.sign(req, { unsignableHeaders: new Set(["x-amz-content-sha256"]) })
.then((signedRequest) => {
const authInfo = {
"Authorization": signedRequest.headers["authorization"],
"HttpMethod": signedRequest.method,
"X-Amz-Date": signedRequest.headers["x-amz-date"],
"Host": signedRequest.headers["host"],
"X-Amz-Security-Token": signedRequest.headers["x-amz-security-token"]
};
return JSON.stringify(authInfo);
});
}

async function createDriver() {
let authToken = { scheme: "basic", realm: "realm", principal: "username", credentials: await signedHeader() };

return neo4j.driver(url, authToken, {
encrypted: "ENCRYPTION_ON",
trust: "TRUST_SYSTEM_CA_SIGNED_CERTIFICATES",
maxConnectionPoolSize: 1,
// logging: neo4j.logging.console("debug")
}
);
}

function unmanagedTxn(driver) {
const session = driver.session();
const tx = session.beginTransaction();
tx.run(createQuery)
.then((res) => {
const id = res.records[0].get(0);
return tx.run(readQuery, { id: id });
})
.then((res) => {
// All good, the transaction will be committed
const msg = res.records[0].get("n.message");
assert.equal(msg, "Hello");
})
.catch(err => {
// The transaction will be rolled back, now handle the error.
console.log(err);
})
.then(() => session.close());
}

createDriver()
.then((driver) => {
unmanagedTxn(driver);
driver.close();
})
.catch((err) => {
console.log(err);
});
37 changes: 36 additions & 1 deletion src/connection/ConnectionProviderBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { AgensGraphConnectionProvider } from '@/connection/AgensGraphConnectionP
import { DatabaseRegistry } from '@/connection/DatabaseRegistry';
import { ConnectionProperties } from '@/connection/ConnectionProperties';
import { DrivineLogger } from '@/logger';
import { NeptuneConnectionProvider } from '@/connection/NeptuneConnectionProvider';

export class ConnectionProviderBuilder {
private logger = new DrivineLogger(ConnectionProviderBuilder.name);
Expand Down Expand Up @@ -110,7 +111,10 @@ export class ConnectionProviderBuilder {
this.registry.register(this.buildAgensGraphAndPostgresProvider(name));
} else if (this._type === DatabaseType.NEO4J) {
this.registry.register(this.buildNeo4jProvider(name));
} else {
} else if (this._type === DatabaseType.NEPTUNE) {
this.registry.register(this.buildNeptuneProvider(name));
}
else {
throw new DrivineError(`Type ${this._type} is not supported by ConnectionProviderBuilder`);
}
return this.registry.connectionProvider(name)!;
Expand Down Expand Up @@ -168,4 +172,35 @@ export class ConnectionProviderBuilder {
}
);
}

private buildNeptuneProvider(name: string): ConnectionProvider {
if (this._userName) {
this.logger.warn(`userName is not supported by Neptune`);
}
if (this._password) {
this.logger.warn(`password is not supported by Neptune`);
}
if (this._idleTimeout) {
this.logger.warn(`idleTimeout is not supported by Neptune`);
}
if (!this._port) {
this._port = 8182;
}
if (this._port !== 8182) {
this.logger.warn(`${this._port} is a non-standard port for Neptune`);
}

return new NeptuneConnectionProvider(
name,
this._type,
this._host,
this._port,
this._protocol,
{
connectionTimeout: this._connectionTimeout
}
);
}


}
3 changes: 2 additions & 1 deletion src/connection/DatabaseType.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export enum DatabaseType {
AGENS_GRAPH = 'AGENS_GRAPH',
NEO4J = 'NEO4J',
POSTGRES = 'POSTGRES'
POSTGRES = 'POSTGRES',
NEPTUNE = 'NEPTUNE'
}
47 changes: 47 additions & 0 deletions src/connection/NeptuneConnectionProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { ConnectionProvider } from '@/connection/ConnectionProvider';
import { Config, Driver } from 'neo4j-driver';
import { Connection } from '@/connection/Connection';
import { Neo4jConnection } from '@/connection/Neo4jConnection';
import { Neo4jResultMapper } from '@/mapper/Neo4jResultMapper';
import * as neo from 'neo4j-driver'
import { DatabaseType } from '@/connection/DatabaseType';
import ShortUniqueId from 'short-unique-id';

const shortId = new ShortUniqueId({ length: 7 });

export class NeptuneConnectionProvider implements ConnectionProvider {

private driver: Driver;

constructor(
readonly name: string,
readonly type: DatabaseType,
readonly host: string,
readonly port: number,
readonly protocol: string = 'bolt',
readonly config: Config
) {

const url = `${this.protocol}://${this.host}:${this.port}`;
const authToken = { scheme: "basic", realm: "realm", principal: "username", credentials: "" };

this.driver = neo.driver(url, authToken, {
...this.config,
encrypted: "ENCRYPTION_ON",
trust: "TRUST_SYSTEM_CA_SIGNED_CERTIFICATES",
maxConnectionPoolSize: 1,
});
}

async connect(): Promise<Connection> {
const session = this.driver.session();
session['sessionId'] = shortId.rnd();
const connection = new Neo4jConnection(session, new Neo4jResultMapper());
return Promise.resolve(connection);
}

async end(): Promise<void> {
return this.driver.close();
}

}
2 changes: 1 addition & 1 deletion src/logger/StatementLogger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export class StatementLogger {
this.logger.verbose({
...query,
sessionId: this.sessionId,
elapsed: `${uSec} µsec`
elapsed: `${uSec} µsec = ${uSec / 1000} ms`,
});
}
}
4 changes: 1 addition & 3 deletions test/2.Integration/HealthRepository.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ describe('HealthRepository', () => {
imports: [
DrivineModule.withOptions(<DrivineModuleOptions>{
connectionProviders: [
DatabaseRegistry.buildOrResolveFromEnv(),
DatabaseRegistry.buildOrResolveFromEnv('TRAFFIC'),
DatabaseRegistry.buildOrResolveFromEnv('POSTGRES')
DatabaseRegistry.buildOrResolveFromEnv()
]
})
],
Expand Down
9 changes: 2 additions & 7 deletions test/2.Integration/HealthRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import { InjectPersistenceManager, PersistenceManager, QuerySpecification } from
@Injectable()
export class HealthRepository {
constructor(
@InjectPersistenceManager() readonly persistenceManager: PersistenceManager,
@InjectPersistenceManager('POSTGRES') readonly pgManager: PersistenceManager
@InjectPersistenceManager() readonly persistenceManager: PersistenceManager
) {}

async countAllMetros(): Promise<number> {
return this.persistenceManager.getOne<any>(new QuerySpecification(`match (n:Metro) return count(n) as count`));
return this.persistenceManager.getOne<any>(new QuerySpecification(`match (n) return count(n) as count`));
}

async filterTest(): Promise<number[]> {
Expand All @@ -19,8 +18,4 @@ export class HealthRepository {
)
);
}

async pgTables(): Promise<any[]> {
return this.pgManager.query(new QuerySpecification(`select * from pg_catalog.pg_tables`));
}
}

0 comments on commit 2e83e59

Please sign in to comment.