Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose a generic read method that can be extended #51

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,47 @@ avoid leaking file descriptors.
await reader.close();
```

### Reading data from a url

Parquet files can be read from a url without having to download the whole file.
You will have to supply the request library as a first argument and the request parameters
as a second argument to the function `parquetReader.openUrl`.

``` js
const request = require('request');
let reader = await parquet.ParquetReader.openUrl(request,'https://domain/fruits.parquet');
```

### Reading data from S3

Parquet files can be read from an S3 object without having to download the whole file.
You will have to supply the aws-sdk client as first argument and the bucket/key information
as second argument to the function `parquetReader.openS3`.

``` js
const AWS = require('aws-sdk');
const client = new AWS.S3({
accessKeyId: 'xxxxxxxxxxx',
secretAccessKey: 'xxxxxxxxxxx'
});

const params = {
Bucket: 'xxxxxxxxxxx',
Key: 'xxxxxxxxxxx'
};

let reader = await parquet.ParquetReader.openS3(client,params);
```

### Reading data from a buffer

If the complete parquet file is in buffer it can be read directly from memory without incurring any additional I/O.

``` js
const file = fs.readFileSync('fruits.parquet');
let reader = await parquet.ParquetReader.openBuffer(file);
```

Encodings
---------

Expand Down
97 changes: 96 additions & 1 deletion lib/reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,36 @@ class ParquetReader {
*/
static async openFile(filePath) {
let envelopeReader = await ParquetEnvelopeReader.openFile(filePath);
return this.openEnvelopeReader(envelopeReader);
}

static async openBuffer(buffer) {
let envelopeReader = await ParquetEnvelopeReader.openBuffer(buffer);
return this.openEnvelopeReader(envelopeReader);
}

/**
* Open the parquet file from S3 using the supplied aws client and params
* The params have to include `Bucket` and `Key` to the file requested
* This function returns a new parquet reader
*/
static async openS3(client, params) {
let envelopeReader = await ParquetEnvelopeReader.openS3(client, params);
return this.openEnvelopeReader(envelopeReader);
}

/**
* Open the parquet file from a url using the supplied request module
* params should either be a string (url) or an object that includes
* a `url` property.
* This function returns a new parquet reader
*/
static async openUrl(request, params) {
let envelopeReader = await ParquetEnvelopeReader.openUrl(request, params);
return this.openEnvelopeReader(envelopeReader);
}

static async openEnvelopeReader(envelopeReader) {
try {
await envelopeReader.readHeader();
let metadata = await envelopeReader.readFooter();
Expand Down Expand Up @@ -200,12 +229,75 @@ class ParquetEnvelopeReader {
return new ParquetEnvelopeReader(readFn, closeFn, fileStat.size);
}

static async openBuffer(buffer) {
let readFn = (offset, length) => buffer.slice(offset,offset+length);
let closeFn = () => ({});
return new ParquetEnvelopeReader(readFn, closeFn, buffer.length);
}

static async openS3(client, params) {
let fileStat = async () => client.headObject(params).promise().then(d => d.ContentLength);

let readFn = async (offset, length) => {
let Range = `bytes=${offset}-${offset+length-1}`;
let res = await client.getObject(Object.assign({Range}, params)).promise();
return res.Body;
};

let closeFn = () => ({});

return new ParquetEnvelopeReader(readFn, closeFn, fileStat);
}

static async openUrl(request, params) {
if (typeof params === 'string')
params = {url: params};
if (!params.url)
throw new Error('URL missing');

params.encoding = params.encoding || null;

let defaultHeaders = params.headers || {};

let filesize = async () => new Promise( (resolve, reject) => {
let req = request(params);
req.on('response', res => {
req.abort();
resolve(res.headers['content-length']);
});
req.on('error', reject);
});

let readFn = (offset, length) => {
let range = `bytes=${offset}-${offset+length-1}`;
let headers = Object.assign({}, defaultHeaders, {range});
let req = Object.assign({}, params, {headers});
return new Promise( (resolve, reject) => {
request(req, (err, res) => {
if (err) {
reject(err);
} else {
resolve(res.body);
}
});
});
};

let closeFn = () => ({});

return new ParquetEnvelopeReader(readFn, closeFn, filesize);
}

constructor(readFn, closeFn, fileSize) {
this.read = readFn;
this.readFn = readFn;
this.close = closeFn;
this.fileSize = fileSize;
}

async read(offset, length) {
return this.readFn(offset, length);
}

async readHeader() {
let buf = await this.read(0, PARQUET_MAGIC.length);

Expand Down Expand Up @@ -262,6 +354,9 @@ class ParquetEnvelopeReader {
}

async readFooter() {
if (typeof this.fileSize === 'function') {
this.fileSize = await this.fileSize();
}
let trailerLen = PARQUET_MAGIC.length + 4;
let trailerBuf = await this.read(this.fileSize - trailerLen, trailerLen);

Expand Down