Skip to content

Commit

Permalink
Add statistics to pages and columns
Browse files Browse the repository at this point in the history
Default for all columns unless `statistics: false` in the field definition
  • Loading branch information
ZJONSSON committed Mar 3, 2018
1 parent 366375a commit 84737b6
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 8 deletions.
52 changes: 51 additions & 1 deletion lib/reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const parquet_util = require('./util')
const parquet_schema = require('./schema')
const parquet_codec = require('./codec')
const parquet_compression = require('./compression')
const parquet_types = require('./types');

/**
* Parquet File Magic String
Expand Down Expand Up @@ -118,6 +119,18 @@ class ParquetReader {
this.schema = new parquet_schema.ParquetSchema(
decodeSchema(
this.metadata.schema.splice(1)));

/* decode any statistics values */
if (this.metadata.row_groups) {
this.metadata.row_groups.forEach(row => row.columns.forEach( col => {
const stats = col.meta_data.statistics;
if (stats) {
const field = this.schema.findField(col.meta_data.path_in_schema);
stats.max_value = decodeStatisticsValue(stats.max_value, field);
stats.min_value = decodeStatisticsValue(stats.min_value, field);
}
}));
}
}

/**
Expand Down Expand Up @@ -294,6 +307,38 @@ function decodeValues(type, encoding, cursor, count, opts) {
return parquet_codec[encoding].decodeValues(type, cursor, count, opts);
}


function decodeStatisticsValue(value, column) {
if (value === null || !value.length) {
return undefined;
}
if (!column.primitiveType.includes('BYTE_ARRAY')) {
value = decodeValues(column.primitiveType,'PLAIN',{buffer: Buffer.from(value), offset: 0}, 1, column);
if (value.length === 1) value = value[0];
}
if (column.originalType) {
value = parquet_types.fromPrimitive(column.originalType, value);
}
return value;
}

function decodeStatistics(statistics, column) {
if (!statistics) {
return;
}
if (statistics.min_value !== null) {
statistics.min_value = decodeStatisticsValue(statistics.min_value, column);
}
if (statistics.max_value !== null) {
statistics.max_value = decodeStatisticsValue(statistics.max_value, column);
}

statistics.min = statistics.min_value;
statistics.max = statistics.max_value;

return statistics;
}

function decodeDataPages(buffer, opts) {
let cursor = {
buffer: buffer,
Expand All @@ -310,19 +355,24 @@ function decodeDataPages(buffer, opts) {

while (cursor.offset < cursor.size) {
const pageHeader = new parquet_thrift.PageHeader();
cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer.slice(cursor.offset));
cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer.slice(cursor.offset));

const pageType = parquet_util.getThriftEnum(
parquet_thrift.PageType,
pageHeader.type);

let pageData = null;

switch (pageType) {
case 'DATA_PAGE':
pageHeader.data_page_header.statistics = decodeStatistics(pageHeader.data_page_header.statistics, opts.column);
pageData = decodeDataPage(cursor, pageHeader, opts);

break;
case 'DATA_PAGE_V2':
pageHeader.data_page_header_v2.statistics = decodeStatistics(pageHeader.data_page_header_v2.statistics, opts.column);
pageData = decodeDataPageV2(cursor, pageHeader, opts);

break;
default:
throw "invalid page type: " + pageType;
Expand Down
2 changes: 2 additions & 0 deletions lib/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ function buildFields(schema, rLevelParentMax, dLevelParentMax, path) {
rLevelMax: rLevelMax,
dLevelMax: dLevelMax,
isNested: true,
statistics: opts.statistics,
fieldCount: Object.keys(opts.fields).length,
fields: buildFields(
opts.fields,
Expand Down Expand Up @@ -150,6 +151,7 @@ function buildFields(schema, rLevelParentMax, dLevelParentMax, path) {
path: path.concat([name]),
repetitionType: repetitionType,
encoding: opts.encoding,
statistics: opts.statistics,
compression: opts.compression,
typeLength: opts.typeLength || typeDef.typeLength,
rLevelMax: rLevelMax,
Expand Down
7 changes: 6 additions & 1 deletion lib/shred.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ exports.shredRecord = function(schema, record, buffer) {
dlevels: [],
rlevels: [],
values: [],
distinct_values: new Set(),
count: 0
};
}
Expand All @@ -51,6 +52,7 @@ exports.shredRecord = function(schema, record, buffer) {
dlevels: [],
rlevels: [],
values: [],
distinct_values: new Set(),
count: 0
};
buffer.pages[field.path] = [];
Expand All @@ -72,9 +74,11 @@ exports.shredRecord = function(schema, record, buffer) {
buffer.columnData[field.path].values,
recordShredded[field.path].values);

[...recordShredded[field.path].distinct_values].forEach(value => buffer.columnData[field.path].distinct_values.add(value));

buffer.columnData[field.path].count += recordShredded[field.path].count;
}
}
};

function shredRecordInternal(fields, record, data, rlvl, dlvl) {
for (let fieldName in fields) {
Expand Down Expand Up @@ -129,6 +133,7 @@ function shredRecordInternal(fields, record, data, rlvl, dlvl) {
rlvl_i,
field.dLevelMax);
} else {
data[field.path].distinct_values.add(values[i]);
data[field.path].values.push(parquet_types.toPrimitive(fieldType, values[i]));
data[field.path].rlevels.push(rlvl_i);
data[field.path].dlevels.push(field.dLevelMax);
Expand Down
19 changes: 18 additions & 1 deletion lib/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,23 @@
const fs = require('fs');
const thrift = require('thrift');


/** We need to use a patched version of TFramedTransport where
* readString returns the original buffer instead of a string if the
* buffer can not be safely encoded as utf8 (see http://bit.ly/2GXeZEF)
*/

class fixedTFramedTransport extends thrift.TFramedTransport {
readString(len) {
this.ensureAvailable(len);
var buffer = this.inBuf.slice(this.readPos, this.readPos + len);
var str = this.inBuf.toString('utf8', this.readPos, this.readPos + len);
this.readPos += len;
return (Buffer.from(str).equals(buffer)) ? str : buffer;
}
}


/**
* Helper function that serializes a thrift object into a buffer
*/
Expand All @@ -24,7 +41,7 @@ exports.decodeThrift = function(obj, buf, offset) {
offset = 0;
}

var transport = new thrift.TFramedTransport(buf);
var transport = new fixedTFramedTransport(buf);
transport.readPos = offset;
var protocol = new thrift.TCompactProtocol(transport);
obj.read(protocol);
Expand Down
96 changes: 91 additions & 5 deletions lib/writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const parquet_shredder = require('./shred')
const parquet_util = require('./util')
const parquet_codec = require('./codec')
const parquet_compression = require('./compression')
const parquet_types = require('./types');

/**
* Parquet File Magic String
Expand Down Expand Up @@ -293,6 +294,30 @@ function encodeValues(type, encoding, values, opts) {
return parquet_codec[encoding].encodeValues(type, values, opts);
}

function encodeStatisticsValue(value, column) {
if (value === undefined) {
return new Buffer(0);
}
if (column.originalType) {
value = parquet_types.toPrimitive(column.originalType,value);
}
if (column.primitiveType !== 'BYTE_ARRAY') {
value = encodeValues(column.primitiveType,'PLAIN',[value],column);
}
return value;
}

function encodeStatistics(statistics,column) {
statistics = Object.assign({},statistics);
statistics.min_value = encodeStatisticsValue(statistics.min_value, column);
statistics.max_value = encodeStatisticsValue(statistics.max_value, column);

statistics.max = statistics.max_value;
statistics.min = statistics.min_value;

return new parquet_thrift.Statistics(statistics);
}

function encodePages(schema, rowBuffer, opts) {
if (!rowBuffer.pageRowCount) {
return;
Expand All @@ -305,6 +330,23 @@ function encodePages(schema, rowBuffer, opts) {

let page;
const values = rowBuffer.columnData[field.path];

let statistics;

if (field.statistics !== false) {
statistics = {};
[...values.distinct_values].forEach( (v,i) => {
if (i === 0 || v > statistics.max_value) {
statistics.max_value = v;
}
if (i === 0 || v < statistics.min_value) {
statistics.min_value = v;
}
});

statistics.null_count = values.count - values.values.length;
statistics.distinct_count = values.distinct_values.size;
}

if (opts.useDataPageV2) {
page = encodeDataPageV2(
Expand All @@ -313,18 +355,27 @@ function encodePages(schema, rowBuffer, opts) {
rowBuffer.pageRowCount,
values.values,
values.rlevels,
values.dlevels);
values.dlevels,
statistics);
} else {
page = encodeDataPage(
field,
values.count,
values.values,
values.rlevels,
values.dlevels);
values.dlevels,
statistics);
}

rowBuffer.pages[field.path].push({page, count: values.values.length });
rowBuffer.pages[field.path].push({
page,
statistics,
distinct_values: values.distinct_values,
count: values.values.length
});


values.distinct_values = new Set();
values.values = [];
values.rlevels = [];
values.dlevels = [];
Expand All @@ -337,7 +388,7 @@ function encodePages(schema, rowBuffer, opts) {
/**
* Encode a parquet data page
*/
function encodeDataPage(column, valueCount, values, rlevels, dlevels) {
function encodeDataPage(column, valueCount, values, rlevels, dlevels, statistics) {
/* encode values */
let valuesBuf = encodeValues(
column.primitiveType,
Expand Down Expand Up @@ -374,6 +425,9 @@ function encodeDataPage(column, valueCount, values, rlevels, dlevels) {
pageHeader.compressed_page_size = pageBody.length;
pageHeader.data_page_header = new parquet_thrift.DataPageHeader();
pageHeader.data_page_header.num_values = rlevels.length;
if (column.statistics !== false) {
pageHeader.data_page_header.statistics = encodeStatistics(statistics, column);
}

pageHeader.data_page_header.encoding = parquet_thrift.Encoding[column.encoding];
pageHeader.data_page_header.definition_level_encoding =
Expand All @@ -388,7 +442,7 @@ function encodeDataPage(column, valueCount, values, rlevels, dlevels) {
/**
* Encode a parquet data page (v2)
*/
function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels) {
function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels, statistics) {
/* encode values */
let valuesBuf = encodeValues(
column.primitiveType,
Expand Down Expand Up @@ -433,6 +487,10 @@ function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels
pageHeader.data_page_header_v2.num_nulls = valueCount - values.length;
pageHeader.data_page_header_v2.num_rows = valueCount;

if (column.statistics !== false) {
pageHeader.data_page_header_v2.statistics = encodeStatistics(statistics, column);
}

pageHeader.uncompressed_page_size =
rLevelsBuf.length + dLevelsBuf.length + valuesBuf.length;

Expand Down Expand Up @@ -477,6 +535,34 @@ function encodeColumnChunk(pages, opts) {
metadata.codec = parquet_thrift.CompressionCodec[
opts.useDataPageV2 ? opts.column.compression : 'UNCOMPRESSED'];

/* compile statistics */
let statistics = {};
let distinct_values = new Set();
statistics.null_count = 0;
statistics.distinct_count = 0;


for (let i = 0; i < pages.length; i++) {
let page = pages[i];

if (opts.column.statistics !== false) {

if (page.statistics.max_value > statistics.max_value || i == 0) {
statistics.max_value = page.statistics.max_value;
}
if (page.statistics.min_value < statistics.min_value || i == 0) {
statistics.min_value = page.statistics.min_value;
}
statistics.null_count += page.statistics.null_count;
page.distinct_values.forEach(value => distinct_values.add(value));
}
}

if (opts.column.statistics !== false) {
statistics.distinct_count = distinct_values.size;
metadata.statistics = encodeStatistics(statistics, opts.column);
}

/* list encodings */
let encodingsSet = {};
encodingsSet[PARQUET_RDLVL_ENCODING] = true;
Expand Down
27 changes: 27 additions & 0 deletions test/integration.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,25 @@ async function verifyPages() {
assert.equal(rowCount, column.column.meta_data.num_values);
}

async function verifyStatistics() {
const column = await sampleColumnHeaders();
const colStats = column.column.meta_data.statistics;

assert.equal(colStats.max_value, 'oranges');
assert.equal(colStats.min_value, 'apples');
assert.equal(colStats.null_count, 0);
assert.equal(colStats.distinct_count, 4);

column.pages.forEach( (d, i) => {
let header = d.data_page_header || d.data_page_header_v2;
let pageStats = header.statistics;
assert.equal(pageStats.null_count,0);
assert.equal(pageStats.distinct_count, 4);
assert.equal(pageStats.max_value, 'oranges');
assert.equal(pageStats.min_value, 'apples');
});
}

async function readTestFile() {
let reader = await parquet.ParquetReader.openFile('fruits.parquet');
assert.equal(reader.getRowCount(), TEST_NUM_ROWS * 4);
Expand Down Expand Up @@ -345,6 +364,10 @@ describe('Parquet', function() {
it('verify that data is split into pages', function() {
return verifyPages();
});

it('verify statistics', function() {
return verifyStatistics();
});
});

describe('with DataPageHeaderV2', function() {
Expand All @@ -362,6 +385,10 @@ describe('Parquet', function() {
return verifyPages();
});

it('verify statistics', function() {
return verifyStatistics();
});

it('write a test file with GZIP compression', function() {
const opts = { useDataPageV2: true, compression: 'GZIP' };
return writeTestFile(opts);
Expand Down
Loading

0 comments on commit 84737b6

Please sign in to comment.