Skip to content

Commit

Permalink
Offer CSV export (#38), begin introducing streaming for exports
Browse files Browse the repository at this point in the history
  • Loading branch information
davidje13 committed Dec 19, 2024
1 parent b2f3dc2 commit 2debac1
Show file tree
Hide file tree
Showing 10 changed files with 551 additions and 38 deletions.
54 changes: 54 additions & 0 deletions backend/src/export/CSVFormatter.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { Writable } from 'node:stream';
import { CSVFormatter } from './CSVFormatter';

describe('CSVFormatter', () => {
it('streams the given CSV table into a buffer', async () => {
const formatter = new CSVFormatter();
const stream = TestStream();
await formatter.stream(stream.writable, [
['a', 'b'],
['c', 'd'],
]);
expect(stream.getValue()).toEqual('a,b\nc,d\n');
});

it('quotes values with special characters', async () => {
const formatter = new CSVFormatter();
const stream = TestStream();
await formatter.stream(stream.writable, [['a\nb', 'c"']]);
expect(stream.getValue()).toEqual('"a\nb","c"""\n');
});

it('consumes iterators in the input', async () => {
const formatter = new CSVFormatter();
const stream = TestStream();
let advance = () => {};
const delay = new Promise<void>((resolve) => {
advance = resolve;
});
const rowGenerator = (async function* () {
yield ['a', 'b'];
await delay;
yield ['c', 'd'];
})();
const promise = formatter.stream(stream.writable, rowGenerator);
await new Promise(process.nextTick);
expect(stream.getValue()).toEqual('a,b\n');
advance();
await promise;
expect(stream.getValue()).toEqual('a,b\nc,d\n');
});
});

const TestStream = () => {
const chunks: string[] = [];
return {
writable: new Writable({
write(chunk, _, callback) {
chunks.push(chunk);
callback();
},
}),
getValue: () => chunks.join(''),
};
};
62 changes: 62 additions & 0 deletions backend/src/export/CSVFormatter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import type { Writable } from 'stream';

type MaybeAsyncIterable<T> = Iterable<T> | AsyncIterable<T>;

export class CSVFormatter {
private readonly escapedQuote: string;
constructor(
private readonly delimiter = ',',
private readonly newline = '\n',
private readonly quote = '"',
) {
this.escapedQuote = quote + quote;
}

private encodeCSVCell(target: Writable, content: string) {
if (SIMPLE_CSV_CELL.test(content)) {
target.write(content);
} else {
target.write(this.quote);
target.write(content.replaceAll(this.quote, this.escapedQuote));
target.write(this.quote);
}
}

async stream(
target: Writable,
rows: MaybeAsyncIterable<MaybeAsyncIterable<string>>,
) {
for await (const row of rows) {
let col = 0;
if (Symbol.iterator in row) {
for (const cell of row) {
if (target.writableNeedDrain) {
await awaitDrain(target);
}
if (col) {
target.write(this.delimiter);
}
this.encodeCSVCell(target, cell);
++col;
}
} else {
for await (const cell of row) {
if (target.writableNeedDrain) {
await awaitDrain(target);
}
if (col) {
target.write(this.delimiter);
}
this.encodeCSVCell(target, cell);
++col;
}
}
target.write(this.newline);
}
}
}

const SIMPLE_CSV_CELL = /^[^"':;\\\r\n\t ]*$/i;

const awaitDrain = (target: Writable) =>
new Promise<void>((resolve) => target.once('drain', resolve));
68 changes: 68 additions & 0 deletions backend/src/export/JSONFormatter.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { Writable } from 'node:stream';
import { JSONFormatter } from './JSONFormatter';

describe('JSONFormatter', () => {
it('streams the given JSON object into a buffer', async () => {
const formatter = JSONFormatter.Builder().build();
const stream = TestStream();
await formatter.stream(stream.writable, { foo: 1.2, bar: ['value'] });
expect(stream.getValue()).toEqual('{"foo":1.2,"bar":["value"]}');
});

it('includes whitespace if configured', async () => {
const formatter = JSONFormatter.Builder().withIndent(2).build();
const stream = TestStream();
await formatter.stream(stream.writable, { foo: 1.2, bar: ['value'] });
expect(stream.getValue()).toEqual(
'{\n "foo": 1.2,\n "bar": [\n "value"\n ]\n}',
);
});

it('ignores undefined properties', async () => {
const formatter = JSONFormatter.Builder().build();
const stream = TestStream();
await formatter.stream(stream.writable, {
foo: false,
bar: null,
baz: undefined,
list: [null, undefined],
});
expect(stream.getValue()).toEqual(
'{"foo":false,"bar":null,"list":[null,null]}',
);
});

it('consumes iterators in the input', async () => {
const formatter = JSONFormatter.Builder().build();
const stream = TestStream();
let advance = () => {};
const delay = new Promise<void>((resolve) => {
advance = resolve;
});
const itemGenerator = (async function* () {
yield 1;
yield 2;
await delay;
yield 3;
})();
const promise = formatter.stream(stream.writable, { list: itemGenerator });
await new Promise(process.nextTick);
expect(stream.getValue()).toEqual('{"list":[1,2');
advance();
await promise;
expect(stream.getValue()).toEqual('{"list":[1,2,3]}');
});
});

const TestStream = () => {
const chunks: string[] = [];
return {
writable: new Writable({
write(chunk, _, callback) {
chunks.push(chunk);
callback();
},
}),
getValue: () => chunks.join(''),
};
};
186 changes: 186 additions & 0 deletions backend/src/export/JSONFormatter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import type { Writable } from 'stream';

type JSONWriter<T> = (
value: T,
context: JSONFormatterContext,
) => Promise<void> | void;

interface JSONType<T> {
detector: (v: unknown) => v is T;
writer: JSONWriter<T>;
}

interface JSONFormatterContext {
write(v: string): void;
stream(object: unknown, nesting?: number): Promise<void>;
joiner(
begin: string,
end: string,
nesting?: number,
): { next: () => void; end: () => void };
newline(nesting?: number): void;
whitespace: boolean;
}

class Builder {
private readonly types: JSONType<unknown>[] = [];
private indent: number | string = '';
private nestingLimit: number = 20;

constructor(
private readonly target: (
...args: [JSONType<unknown>[], number | string, number]
) => JSONFormatter,
) {}

withType<T>(type: JSONType<T>) {
this.types.unshift(type as JSONType<unknown>);
return this;
}

withIndent(indent: number | string) {
this.indent = indent;
return this;
}

withNestingLimit(nestingLimit: number) {
this.nestingLimit = nestingLimit;
return this;
}

build() {
return this.target([...this.types], this.indent, this.nestingLimit);
}
}

export class JSONFormatter {
private readonly newline:
| ((target: Writable, nesting: number) => void)
| null;
private readonly whitespace: boolean;

private constructor(
private readonly types: JSONType<unknown>[],
indent: number | string,
nestingLimit: number,
) {
if (indent) {
const indentStep =
typeof indent === 'number' ? ' '.repeat(indent) : indent;
const newlines: string[] = [];
for (let i = 0; i <= nestingLimit; ++i) {
newlines.push('\n' + indentStep.repeat(i));
}
this.newline = (target: Writable, nesting: number) =>
target.write(newlines[Math.min(nesting, nestingLimit)]);
this.whitespace = true;
} else {
this.newline = null;
this.whitespace = false;
}
}

static Builder() {
return new Builder((...args) => new JSONFormatter(...args))
.withType(JSONFormatter.OBJECT)
.withType(JSONFormatter.ASYNC_ITERABLE)
.withType(JSONFormatter.ITERABLE);
}

private joiner(
target: Writable,
nesting: number,
begin: string,
end: string,
) {
let count = 0;
target.write(begin);
return {
next: () => {
if (count) {
target.write(',');
}
this.newline?.(target, nesting + 1);
++count;
},
end: () => {
if (count) {
this.newline?.(target, nesting);
}
target.write(end);
},
};
}

async stream(target: Writable, object: unknown, nesting: number = 0) {
if (target.writableNeedDrain) {
await awaitDrain(target);
}
for (const { detector, writer } of this.types) {
if (detector(object)) {
await writer(object, {
write: (v) => target.write(v),
stream: (subObject: unknown, subNesting: number = 0) =>
this.stream(target, subObject, nesting + subNesting),
joiner: (begin: string, end: string, subNesting: number = 0) =>
this.joiner(target, nesting + subNesting, begin, end),
newline: (subNesting: number = 0) =>
this.newline?.(target, nesting + subNesting),
whitespace: this.whitespace,
});
return;
}
}
target.write(JSON.stringify(object ?? null));
}

static readonly ITERABLE: JSONType<Iterable<unknown>> = {
detector: (object): object is Iterable<unknown> =>
Boolean(
object && typeof object === 'object' && Symbol.iterator in object,
),
async writer(object, { joiner, stream }) {
const j = joiner('[', ']');
for (const v of object) {
j.next();
await stream(v, 1);
}
j.end();
},
};

static readonly ASYNC_ITERABLE: JSONType<AsyncIterable<unknown>> = {
detector: (object): object is AsyncIterable<unknown> =>
Boolean(
object && typeof object === 'object' && Symbol.asyncIterator in object,
),
async writer(object, { joiner, stream }) {
const j = joiner('[', ']');
for await (const v of object) {
j.next();
await stream(v, 1);
}
j.end();
},
};

static readonly OBJECT: JSONType<object> = {
detector: (object): object is object =>
Boolean(object && typeof object === 'object'),
async writer(object, { write, joiner, stream, whitespace }) {
const j = joiner('{', '}');
for (const [k, v] of Object.entries(object)) {
if (v !== undefined) {
j.next();
write(JSON.stringify(k));
write(whitespace ? ': ' : ':');
await stream(v, 1);
}
}
j.end();
},
};
}

const awaitDrain = (target: Writable) =>
new Promise<void>((resolve) => target.once('drain', resolve));
Loading

0 comments on commit 2debac1

Please sign in to comment.