From 70a67f1ed81c41114e4588cffe1b89b76f5b65f0 Mon Sep 17 00:00:00 2001 From: Ziggy Jonsson Date: Fri, 2 Mar 2018 08:44:27 -0500 Subject: [PATCH] Add statistics to pages and columns Default for all columns unless `statistics: false` in the field definition --- lib/reader.js | 52 +++++++++++++++- lib/schema.js | 2 + lib/shred.js | 7 ++- lib/util.js | 19 +++++- lib/writer.js | 96 ++++++++++++++++++++++++++++-- test/integration.js | 27 +++++++++ test/statistics.js | 140 ++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 335 insertions(+), 8 deletions(-) create mode 100644 test/statistics.js diff --git a/lib/reader.js b/lib/reader.js index 0e4778f8..e6de0675 100644 --- a/lib/reader.js +++ b/lib/reader.js @@ -7,6 +7,7 @@ const parquet_util = require('./util') const parquet_schema = require('./schema') const parquet_codec = require('./codec') const parquet_compression = require('./compression') +const parquet_types = require('./types'); /** * Parquet File Magic String @@ -118,6 +119,18 @@ class ParquetReader { this.schema = new parquet_schema.ParquetSchema( decodeSchema( this.metadata.schema.splice(1))); + + /* decode any statistics values */ + if (this.metadata.row_groups) { + this.metadata.row_groups.forEach(row => row.columns.forEach( col => { + const stats = col.meta_data.statistics; + if (stats) { + const field = this.schema.findField(col.meta_data.path_in_schema); + stats.max_value = decodeStatisticsValue(stats.max_value, field); + stats.min_value = decodeStatisticsValue(stats.min_value, field); + } + })); + } } /** @@ -294,6 +307,38 @@ function decodeValues(type, encoding, cursor, count, opts) { return parquet_codec[encoding].decodeValues(type, cursor, count, opts); } + +function decodeStatisticsValue(value, column) { + if (!value.length) { + return undefined; + } + if (!column.primitiveType.includes('BYTE_ARRAY')) { + value = decodeValues(column.primitiveType,'PLAIN',{buffer: Buffer.from(value), offset: 0}, 1, column); + if (value.length === 1) value = value[0]; + } + if (column.originalType) { + value = parquet_types.fromPrimitive(column.originalType, value); + } + return value; +} + +function decodeStatistics(statistics, column) { + if (!statistics) { + return; + } + if (statistics.min_value !== null) { + statistics.min_value = decodeStatisticsValue(statistics.min_value, column); + } + if (statistics.max_value !== null) { + statistics.max_value = decodeStatisticsValue(statistics.max_value, column); + } + + statistics.min = statistics.min_value; + statistics.max = statistics.max_value; + + return statistics; +} + function decodeDataPages(buffer, opts) { let cursor = { buffer: buffer, @@ -310,19 +355,24 @@ function decodeDataPages(buffer, opts) { while (cursor.offset < cursor.size) { const pageHeader = new parquet_thrift.PageHeader(); - cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer.slice(cursor.offset)); + cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer.slice(cursor.offset)); const pageType = parquet_util.getThriftEnum( parquet_thrift.PageType, pageHeader.type); let pageData = null; + switch (pageType) { case 'DATA_PAGE': + pageHeader.data_page_header.statistics = decodeStatistics(pageHeader.data_page_header.statistics, opts.column); pageData = decodeDataPage(cursor, pageHeader, opts); + break; case 'DATA_PAGE_V2': + pageHeader.data_page_header_v2.statistics = decodeStatistics(pageHeader.data_page_header_v2.statistics, opts.column); pageData = decodeDataPageV2(cursor, pageHeader, opts); + break; default: throw "invalid page type: " + pageType; diff --git a/lib/schema.js b/lib/schema.js index 9cb7fa30..897c3552 100644 --- a/lib/schema.js +++ b/lib/schema.js @@ -108,6 +108,7 @@ function buildFields(schema, rLevelParentMax, dLevelParentMax, path) { rLevelMax: rLevelMax, dLevelMax: dLevelMax, isNested: true, + statistics: opts.statistics, fieldCount: Object.keys(opts.fields).length, fields: buildFields( opts.fields, @@ -150,6 +151,7 @@ function buildFields(schema, rLevelParentMax, dLevelParentMax, path) { path: path.concat([name]), repetitionType: repetitionType, encoding: opts.encoding, + statistics: opts.statistics, compression: opts.compression, typeLength: opts.typeLength || typeDef.typeLength, rLevelMax: rLevelMax, diff --git a/lib/shred.js b/lib/shred.js index 5270de9f..bdb9a719 100644 --- a/lib/shred.js +++ b/lib/shred.js @@ -33,6 +33,7 @@ exports.shredRecord = function(schema, record, buffer) { dlevels: [], rlevels: [], values: [], + distinct_values: new Set(), count: 0 }; } @@ -51,6 +52,7 @@ exports.shredRecord = function(schema, record, buffer) { dlevels: [], rlevels: [], values: [], + distinct_values: new Set(), count: 0 }; buffer.pages[field.path] = []; @@ -72,9 +74,11 @@ exports.shredRecord = function(schema, record, buffer) { buffer.columnData[field.path].values, recordShredded[field.path].values); + [...recordShredded[field.path].distinct_values].forEach(value => buffer.columnData[field.path].distinct_values.add(value)); + buffer.columnData[field.path].count += recordShredded[field.path].count; } -} +}; function shredRecordInternal(fields, record, data, rlvl, dlvl) { for (let fieldName in fields) { @@ -129,6 +133,7 @@ function shredRecordInternal(fields, record, data, rlvl, dlvl) { rlvl_i, field.dLevelMax); } else { + data[field.path].distinct_values.add(values[i]); data[field.path].values.push(parquet_types.toPrimitive(fieldType, values[i])); data[field.path].rlevels.push(rlvl_i); data[field.path].dlevels.push(field.dLevelMax); diff --git a/lib/util.js b/lib/util.js index 91356ca9..a6d2e773 100644 --- a/lib/util.js +++ b/lib/util.js @@ -2,6 +2,23 @@ const fs = require('fs'); const thrift = require('thrift'); + +/** We need to use a patched version of TFramedTransport where + * readString returns the original buffer instead of a string if the + * buffer can not be safely encoded as utf8 (see http://bit.ly/2GXeZEF) + */ + +class fixedTFramedTransport extends thrift.TFramedTransport { + readString(len) { + this.ensureAvailable(len); + var buffer = this.inBuf.slice(this.readPos, this.readPos + len); + var str = this.inBuf.toString('utf8', this.readPos, this.readPos + len); + this.readPos += len; + return (Buffer.from(str).equals(buffer)) ? str : buffer; + } +} + + /** * Helper function that serializes a thrift object into a buffer */ @@ -24,7 +41,7 @@ exports.decodeThrift = function(obj, buf, offset) { offset = 0; } - var transport = new thrift.TFramedTransport(buf); + var transport = new fixedTFramedTransport(buf); transport.readPos = offset; var protocol = new thrift.TCompactProtocol(transport); obj.read(protocol); diff --git a/lib/writer.js b/lib/writer.js index bed89fb7..ccc76eb6 100644 --- a/lib/writer.js +++ b/lib/writer.js @@ -7,6 +7,7 @@ const parquet_shredder = require('./shred') const parquet_util = require('./util') const parquet_codec = require('./codec') const parquet_compression = require('./compression') +const parquet_types = require('./types'); /** * Parquet File Magic String @@ -293,6 +294,30 @@ function encodeValues(type, encoding, values, opts) { return parquet_codec[encoding].encodeValues(type, values, opts); } +function encodeStatisticsValue(value, column) { + if (value === undefined) { + return new Buffer(0); + } + if (column.originalType) { + value = parquet_types.toPrimitive(column.originalType,value); + } + if (column.primitiveType !== 'BYTE_ARRAY') { + value = encodeValues(column.primitiveType,'PLAIN',[value],column); + } + return value; +} + +function encodeStatistics(statistics,column) { + statistics = Object.assign({},statistics); + statistics.min_value = encodeStatisticsValue(statistics.min_value, column); + statistics.max_value = encodeStatisticsValue(statistics.max_value, column); + + statistics.max = statistics.max_value; + statistics.min = statistics.min_value; + + return new parquet_thrift.Statistics(statistics); +} + function encodePages(schema, rowBuffer, opts) { if (!rowBuffer.pageRowCount) { return; @@ -305,6 +330,23 @@ function encodePages(schema, rowBuffer, opts) { let page; const values = rowBuffer.columnData[field.path]; + + let statistics; + + if (field.statistics !== false) { + statistics = {}; + [...values.distinct_values].forEach( (v,i) => { + if (i === 0 || v > statistics.max_value) { + statistics.max_value = v; + } + if (i === 0 || v < statistics.min_value) { + statistics.min_value = v; + } + }); + + statistics.null_count = values.count - values.values.length; + statistics.distinct_count = values.distinct_values.size; + } if (opts.useDataPageV2) { page = encodeDataPageV2( @@ -313,18 +355,27 @@ function encodePages(schema, rowBuffer, opts) { rowBuffer.pageRowCount, values.values, values.rlevels, - values.dlevels); + values.dlevels, + statistics); } else { page = encodeDataPage( field, values.count, values.values, values.rlevels, - values.dlevels); + values.dlevels, + statistics); } - rowBuffer.pages[field.path].push({page, count: values.values.length }); + rowBuffer.pages[field.path].push({ + page, + statistics, + distinct_values: values.distinct_values, + count: values.values.length + }); + + values.distinct_values = new Set(); values.values = []; values.rlevels = []; values.dlevels = []; @@ -337,7 +388,7 @@ function encodePages(schema, rowBuffer, opts) { /** * Encode a parquet data page */ -function encodeDataPage(column, valueCount, values, rlevels, dlevels) { +function encodeDataPage(column, valueCount, values, rlevels, dlevels, statistics) { /* encode values */ let valuesBuf = encodeValues( column.primitiveType, @@ -374,6 +425,9 @@ function encodeDataPage(column, valueCount, values, rlevels, dlevels) { pageHeader.compressed_page_size = pageBody.length; pageHeader.data_page_header = new parquet_thrift.DataPageHeader(); pageHeader.data_page_header.num_values = rlevels.length; + if (column.statistics !== false) { + pageHeader.data_page_header.statistics = encodeStatistics(statistics, column); + } pageHeader.data_page_header.encoding = parquet_thrift.Encoding[column.encoding]; pageHeader.data_page_header.definition_level_encoding = @@ -388,7 +442,7 @@ function encodeDataPage(column, valueCount, values, rlevels, dlevels) { /** * Encode a parquet data page (v2) */ -function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels) { +function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels, statistics) { /* encode values */ let valuesBuf = encodeValues( column.primitiveType, @@ -433,6 +487,10 @@ function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels pageHeader.data_page_header_v2.num_nulls = valueCount - values.length; pageHeader.data_page_header_v2.num_rows = valueCount; + if (column.statistics !== false) { + pageHeader.data_page_header_v2.statistics = encodeStatistics(statistics, column); + } + pageHeader.uncompressed_page_size = rLevelsBuf.length + dLevelsBuf.length + valuesBuf.length; @@ -477,6 +535,34 @@ function encodeColumnChunk(pages, opts) { metadata.codec = parquet_thrift.CompressionCodec[ opts.useDataPageV2 ? opts.column.compression : 'UNCOMPRESSED']; + /* compile statistics */ + let statistics = {}; + let distinct_values = new Set(); + statistics.null_count = 0; + statistics.distinct_count = 0; + + + for (let i = 0; i < pages.length; i++) { + let page = pages[i]; + + if (opts.column.statistics !== false) { + + if (page.statistics.max_value > statistics.max_value || i == 0) { + statistics.max_value = page.statistics.max_value; + } + if (page.statistics.min_value < statistics.min_value || i == 0) { + statistics.min_value = page.statistics.min_value; + } + statistics.null_count += page.statistics.null_count; + page.distinct_values.forEach(value => distinct_values.add(value)); + } + } + + if (opts.column.statistics !== false) { + statistics.distinct_count = distinct_values.size; + metadata.statistics = encodeStatistics(statistics, opts.column); + } + /* list encodings */ let encodingsSet = {}; encodingsSet[PARQUET_RDLVL_ENCODING] = true; diff --git a/test/integration.js b/test/integration.js index 4e40273a..279d7e7c 100644 --- a/test/integration.js +++ b/test/integration.js @@ -151,6 +151,25 @@ async function verifyPages() { assert.equal(rowCount, column.column.meta_data.num_values); } +async function verifyStatistics() { + const column = await sampleColumnHeaders(); + const colStats = column.column.meta_data.statistics; + + assert.equal(colStats.max_value, 'oranges'); + assert.equal(colStats.min_value, 'apples'); + assert.equal(colStats.null_count, 0); + assert.equal(colStats.distinct_count, 4); + + column.pages.forEach( (d, i) => { + let header = d.data_page_header || d.data_page_header_v2; + let pageStats = header.statistics; + assert.equal(pageStats.null_count,0); + assert.equal(pageStats.distinct_count, 4); + assert.equal(pageStats.max_value, 'oranges'); + assert.equal(pageStats.min_value, 'apples'); + }); +} + async function readTestFile() { let reader = await parquet.ParquetReader.openFile('fruits.parquet'); assert.equal(reader.getRowCount(), TEST_NUM_ROWS * 4); @@ -345,6 +364,10 @@ describe('Parquet', function() { it('verify that data is split into pages', function() { return verifyPages(); }); + + it('verify statistics', function() { + return verifyStatistics(); + }); }); describe('with DataPageHeaderV2', function() { @@ -362,6 +385,10 @@ describe('Parquet', function() { return verifyPages(); }); + it('verify statistics', function() { + return verifyStatistics(); + }); + it('write a test file with GZIP compression', function() { const opts = { useDataPageV2: true, compression: 'GZIP' }; return writeTestFile(opts); diff --git a/test/statistics.js b/test/statistics.js new file mode 100644 index 00000000..2740d34e --- /dev/null +++ b/test/statistics.js @@ -0,0 +1,140 @@ +'use strict'; +const chai = require('chai'); +const assert = chai.assert; +const parquet = require('../parquet.js'); +const TEST_VTIME = new Date(); + +const schema = new parquet.ParquetSchema({ + name: { type: 'UTF8' }, + //quantity: { type: 'INT64', encoding: 'RLE', typeLength: 6, optional: true }, // parquet-mr actually doesnt support this + quantity: { type: 'INT64', optional: true }, + price: { type: 'DOUBLE' }, + date: { type: 'TIMESTAMP_MICROS' }, + day: { type: 'DATE' }, + finger: { type: 'FIXED_LEN_BYTE_ARRAY', typeLength: 5 }, + inter: { type: 'INTERVAL', statistics: false }, + stock: { + repeated: true, + fields: { + quantity: { type: 'INT64', repeated: true }, + warehouse: { type: 'UTF8' }, + } + }, + colour: { type: 'UTF8', repeated: true }, + meta_json: { type: 'BSON', optional: true, statistics: false}, +}); + + +describe('statistics', async function() { + before(async function(){ + let writer = await parquet.ParquetWriter.openFile(schema, 'fruits-statistics.parquet', {pageSize: 3}); + + writer.appendRow({ + name: 'apples', + quantity: 10, + price: 2.6, + day: new Date('2017-11-26'), + date: new Date(TEST_VTIME + 1000), + finger: "FNORD", + inter: { months: 10, days: 5, milliseconds: 777 }, + stock: [ + { quantity: 10, warehouse: "A" }, + { quantity: 20, warehouse: "B" } + ], + colour: [ 'green', 'red' ] + }); + + writer.appendRow({ + name: 'oranges', + quantity: 20, + price: 2.7, + day: new Date('2018-03-03'), + date: new Date(TEST_VTIME + 2000), + finger: "ABCDE", + inter: { months: 42, days: 23, milliseconds: 777 }, + stock: { + quantity: [50, 33, 34, 35, 36], + warehouse: "X" + }, + colour: [ 'orange' ] + }); + + writer.appendRow({ + name: 'kiwi', + price: 4.2, + quantity: 15, + day: new Date('2008-11-26'), + date: new Date(TEST_VTIME + 8000), + finger: "XCVBN", + inter: { months: 60, days: 1, milliseconds: 99 }, + stock: [ + { quantity: 42, warehouse: "f" }, + { quantity: 21, warehouse: "x" } + ], + colour: [ 'green', 'brown', 'yellow' ], + meta_json: { expected_ship_date: TEST_VTIME } + }); + + writer.appendRow({ + name: 'banana', + price: 3.2, + day: new Date('2017-11-26'), + date: new Date(TEST_VTIME + 6000), + finger: "FNORD", + inter: { months: 1, days: 15, milliseconds: 888 }, + colour: [ 'yellow'], + meta_json: { shape: 'curved' } + }); + + await writer.close(); + }); + + it('column statistics should match input', async function() { + let reader = await parquet.ParquetReader.openFile('fruits-statistics.parquet'); + const row = reader.metadata.row_groups[0]; + const rowStats = (path) => row.columns.find(d => d.meta_data.path_in_schema.join(',') == path).meta_data.statistics; + + assert.equal(rowStats('name').min_value,'apples'); + assert.equal(rowStats('name').max_value,'oranges'); + assert.equal(+rowStats('name').distinct_count,4); + assert.equal(+rowStats('name').null_count,0); + + assert.equal(rowStats('quantity').min_value,10); + assert.equal(rowStats('quantity').max_value,20); + assert.equal(+rowStats('quantity').distinct_count,3); + assert.equal(+rowStats('quantity').null_count,1); + + assert.equal(rowStats('price').min_value, 2.6); + assert.equal(rowStats('price').max_value, 4.2); + assert.equal(+rowStats('price').distinct_count, 4); + assert.equal(+rowStats('price').null_count, 0); + + assert.deepEqual(rowStats('day').min_value, new Date('2008-11-26')); + assert.deepEqual(rowStats('day').max_value, new Date('2018-03-03')); + assert.equal(+rowStats('day').distinct_count, 4); + assert.equal(+rowStats('day').null_count, 0); + + assert.deepEqual(rowStats('finger').min_value, 'ABCDE'); + assert.deepEqual(rowStats('finger').max_value, 'XCVBN'); + assert.equal(+rowStats('finger').distinct_count, 3); + assert.equal(+rowStats('finger').null_count, 0); + + assert.deepEqual(rowStats('stock,quantity').min_value, 10); + assert.deepEqual(rowStats('stock,quantity').max_value, 50); + assert.equal(+rowStats('stock,quantity').distinct_count, 9); + assert.equal(+rowStats('stock,quantity').null_count, 1); + + assert.deepEqual(rowStats('stock,warehouse').min_value, 'A'); + assert.deepEqual(rowStats('stock,warehouse').max_value, 'x'); + assert.equal(+rowStats('stock,warehouse').distinct_count, 5); + assert.equal(+rowStats('stock,warehouse').null_count, 1); + + assert.deepEqual(rowStats('colour').min_value, 'brown'); + assert.deepEqual(rowStats('colour').max_value, 'yellow'); + assert.equal(+rowStats('colour').distinct_count, 5); + assert.equal(+rowStats('colour').null_count, 0); + + assert.equal(rowStats('inter'), null); + assert.equal(rowStats('meta_json'), null); + }); +}); \ No newline at end of file