Skip to content

Commit

Permalink
Implement x-arango-async: store
Browse files Browse the repository at this point in the history
Fixes DE-610.
  • Loading branch information
pluma4345 committed Oct 23, 2023
1 parent b7c4f23 commit ed0340b
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 12 deletions.
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,24 @@ This driver uses semantic versioning:
- A change in the major version (e.g. 1.Y.Z -> 2.0.0) indicates _breaking_
changes that require changes in your code to upgrade.

## [Unreleased]

### Added

- Added `db.createJob` method to convert arbitrary requests into async jobs (DE-610)

This method can be used to set the `x-arango-async: store` header on any
request, which will cause the server to store the request in an async job:

```js
const collectionsJob = await db.createJob(() => db.collections());
// once loaded, collectionsJob.result will be an array of Collection instances
const numbersJob = await db.createJob(() =>
db.query(aql`FOR i IN 1..1000 RETURN i`)
);
// once loaded, numbersJob.result will be an ArrayCursor of numbers
```

## [8.5.0] - 2023-10-09

### Added
Expand Down Expand Up @@ -1725,6 +1743,7 @@ For a detailed list of changes between pre-release versions of v7 see the

Graph methods now only return the relevant part of the response body.

[unreleased]: https://github.com/arangodb/arangojs/compare/v8.5.0...HEAD
[8.5.0]: https://github.com/arangodb/arangojs/compare/v8.4.1...v8.5.0
[8.4.1]: https://github.com/arangodb/arangojs/compare/v8.4.0...v8.4.1
[8.4.0]: https://github.com/arangodb/arangojs/compare/v8.3.1...v8.4.0
Expand Down
90 changes: 87 additions & 3 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1755,6 +1755,17 @@ export type LogEntries = {
text: string[];
};

type TrappedError = {
error: true;
};

type TrappedRequest = {
error?: false;
jobId: string;
onResolve: (res: ArangojsResponse) => void;
onReject: (error: any) => void;
};

/**
* An object representing a single ArangoDB database. All arangojs collections,
* cursors, analyzers and so on are linked to a `Database` object.
Expand All @@ -1766,6 +1777,7 @@ export class Database {
protected _collections = new Map<string, Collection>();
protected _graphs = new Map<string, Graph>();
protected _views = new Map<string, View>();
protected _trapRequest?: (trapped: TrappedError | TrappedRequest) => void;

/**
* Creates a new `Database` instance with its own connection pool.
Expand Down Expand Up @@ -1905,6 +1917,49 @@ export class Database {
return new Route(this, path, headers);
}

/**
* Creates an async job by executing the given callback function. The first
* database request performed by the callback will be marked for asynchronous
* execution and its result will be made available as an async job.
*
* Returns a {@link Job} instance that can be used to retrieve the result
* of the callback function once the request has been executed.
*
* @param callback - Callback function to execute as an async job.
*
* @example
* ```js
* const db = new Database();
* const job = await db.createJob(() => db.collections());
* while (!job.isLoaded) {
* await timeout(1000);
* await job.load();
* }
* // job.result is a list of Collection instances
* ```
*/
async createJob<T>(callback: () => Promise<T>): Promise<Job<T>> {
const trap = new Promise<TrappedError | TrappedRequest>((resolveTrap) => {
this._trapRequest = (trapped) => resolveTrap(trapped);
});
const eventualResult = callback();
const trapped = await trap;
if (trapped.error) return eventualResult as Promise<any>;
const { jobId, onResolve, onReject } = trapped as TrappedRequest;
return new Job(
this,
jobId,
(res) => {
onResolve(res);
return eventualResult;
},
(e) => {
onReject(e);
return eventualResult;
}
);
}

/**
* @internal
*
Expand All @@ -1918,7 +1973,7 @@ export class Database {
* @param transform - An optional function to transform the low-level
* response object to a more useful return value.
*/
request<T = any>(
async request<T = any>(
options: RequestOptions & { absolutePath?: boolean },
transform?: (res: ArangojsResponse) => T
): Promise<T>;
Expand All @@ -1934,11 +1989,11 @@ export class Database {
* @param transform - If set to `false`, the raw response object will be
* returned.
*/
request(
async request(
options: RequestOptions & { absolutePath?: boolean },
transform: false
): Promise<ArangojsResponse>;
request<T = any>(
async request<T = any>(
{
absolutePath = false,
basePath,
Expand All @@ -1949,6 +2004,35 @@ export class Database {
if (!absolutePath) {
basePath = `/_db/${encodeURIComponent(this._name)}${basePath || ""}`;
}
if (this._trapRequest) {
const trap = this._trapRequest;
this._trapRequest = undefined;
return new Promise<T>(async (resolveRequest, rejectRequest) => {
const options = { ...opts };
options.headers = { ...options.headers, "x-arango-async": "store" };
let jobRes: ArangojsResponse;
try {
jobRes = await this._connection.request({ basePath, ...options });
} catch (e) {
trap({ error: true });
rejectRequest(e);
return;
}
const jobId = jobRes.headers["x-arango-async-id"] as string;
trap({
jobId,
onResolve: (res) => {
const result = transform ? transform(res) : (res as T);
resolveRequest(result);
return result;
},
onReject: (err) => {
rejectRequest(err);
throw err;
},
});
});
}
return this._connection.request(
{ basePath, ...opts },
transform || undefined
Expand Down
40 changes: 31 additions & 9 deletions src/job.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
import { Database } from "./database";
import { ArangojsResponse } from "./lib/request.node";

/**
* Represents an async job in a {@link database.Database}.
*/
export class Job<T = any> {
protected _id: string;
protected _db: Database;
protected _transformResponse?: (res: ArangojsResponse) => Promise<T>;
protected _transformError?: (error: any) => Promise<T>;
protected _loaded: boolean = false;
protected _result: T | undefined;

/**
* @internal
*/
constructor(db: Database, id: string) {
constructor(
db: Database,
id: string,
transformResponse?: (res: ArangojsResponse) => Promise<T>,
transformError?: (error: any) => Promise<T>
) {
this._db = db;
this._id = id;
this._transformResponse = transformResponse;
this._transformError = transformError;
}

/**
Expand Down Expand Up @@ -49,16 +59,28 @@ export class Job<T = any> {
*/
async load(): Promise<T | undefined> {
if (!this.isLoaded) {
const res = await this._db.request(
{
method: "PUT",
path: `/_api/job/${this._id}`,
},
false
);
let res: ArangojsResponse;
try {
res = await this._db.request(
{
method: "PUT",
path: `/_api/job/${this._id}`,
},
false
);
} catch (e) {
if (this._transformError) {
return this._transformError(e);
}
throw e;
}
if (res.statusCode !== 204) {
this._loaded = true;
this._result = res.body;
if (this._transformResponse) {
this._result = await this._transformResponse(res);
} else {
this._result = res.body;
}
}
}
return this._result;
Expand Down

0 comments on commit ed0340b

Please sign in to comment.