Skip to content

Commit

Permalink
Merge pull request #10 from crowemi/add-parquet-validation
Browse files Browse the repository at this point in the history
Add parquet validation
  • Loading branch information
crowemi authored Apr 13, 2023
2 parents 73783a1 + 2b4c575 commit 1c0f90c
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 44 deletions.
34 changes: 4 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,6 @@

Build with the [Meltano Target SDK](https://sdk.meltano.com).

<!--
Developer TODO: Update the below as needed to correctly describe the install procedure. For instance, if you do not have a PyPi repo, or if you want users to directly install from your git repo, you can modify this step as appropriate.
## Installation
Install from PyPi:
```bash
pipx install target-s3
```
Install from GitHub:
```bash
pipx install git+https://github.com/ORG_NAME/target-s3.git@main
```
-->

## Configuration

Expand All @@ -32,7 +13,9 @@ pipx install git+https://github.com/ORG_NAME/target-s3.git@main
{
"format": {
"format_type": "json",
"format_parquet": {},
"format_parquet": {
"validate": "[true/false]"
},
"format_json": {},
"format_csv": {}
},
Expand All @@ -57,17 +40,8 @@ pipx install git+https://github.com/ORG_NAME/target-s3.git@main
"flatten_records": false
}
```
`format.format_parquet.validate` [`Boolean`, default: `False`] - this flag determines whether the data types of incoming data elements should be validated. When set `True`, a schema is created from the first record and all subsequent records that don't match that data type are cast.


<!--
Developer TODO: Provide a list of config options accepted by the target.
This section can be created by copy-pasting the CLI output from:
```
target-s3 --about --format=markdown
```
-->
## Capabilities

* `about`
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "target-s3"
version = "0.5.4"
version = "1.0.1"
description = "`target-s3` is a Singer target for s3, built with the Meltano Singer SDK."
authors = ["crowemi"]
keywords = [
Expand Down
4 changes: 3 additions & 1 deletion sample-config.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
{
"format": {
"format_type": "json",
"format_parquet": {},
"format_parquet": {
"validate": false
},
"format_json": {},
"format_csv": {}
},
Expand Down
74 changes: 63 additions & 11 deletions target_s3/formats/format_parquet.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pyarrow
from pyarrow import fs
from pyarrow import fs, Table
from pyarrow.parquet import ParquetWriter

from target_s3.formats.format_base import FormatBase
Expand Down Expand Up @@ -39,31 +39,83 @@ def create_filesystem(
self.logger.error(e)
raise e

def validate(self, field, value):
if isinstance(value, dict) and not value:
# pyarrow can't process empty struct
return None
def validate(self, schema: dict, field, value) -> dict:
def unpack_dict(record):
ret = dict()
for field in record:
if isinstance(value[field], dict):
ret[field] = unpack_dict(value[field])
else:
ret[field] = {"type": type(value[field])}
return ret

def validate_dict(value):
fields = schema[field].get("fields")
for v in value:
# make sure value is in fields
if not v in fields:
# add field and type
if isinstance(value[v], dict):
fields[v] = unpack_dict(value[v])
else:
fields[v] = {"type": type(value[v])}
else:
# check data type
if isinstance(value[v], dict):
value[v] = unpack_dict(value[field])
else:
expected_type = fields[v].get("type")
if not isinstance(value[v], expected_type):
value[v] = expected_type(value[v])
return value

if field in schema:
# make sure datatypes align
if isinstance(value, dict):
if not value:
# pyarrow can't process empty struct, return None
return None
else:
validate_dict(value)
else:
expected_type = schema[field].get("type")
if not isinstance(value, expected_type):
# if the values don't match try to cast current value to expected type, this souldn't happen,
# an error will occur during target instantiation.
value = expected_type(value)

else:
# add new entry for field
if isinstance(value, dict):
schema[field] = {"type": type(value), "fields": unpack_dict(value)}
else:
schema[field] = {"type": type(value)}

return value

def create_dataframe(self) -> pyarrow.Table:
def create_dataframe(self) -> Table:
"""Creates a pyarrow Table object from the record set."""
try:
fields = set()
for d in self.records:
fields = fields.union(d.keys())
dataframe = pyarrow.table(
{
f: [self.validate(f, row.get(f)) for row in self.records]

if self.format.get("format_parquet", None).get("validate", None):
schema = dict()
input = {
f: [self.validate(schema, f, row.get(f)) for row in self.records]
for f in fields
}
)
else:
input = {f: [row.get(f) for row in self.records] for f in fields}

ret = Table.from_pydict(mapping=input)
except Exception as e:
self.logger.error("Failed to create parquet dataframe.")
self.logger.error(e)
raise e

return dataframe
return ret

def _prepare_records(self):
# use default behavior, no additional prep needed
Expand Down
9 changes: 8 additions & 1 deletion target_s3/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ class Targets3(Target):
),
th.Property(
"format_parquet",
th.ObjectType(),
th.ObjectType(
th.Property(
"validate",
th.BooleanType,
required=False,
default=False,
),
),
required=False,
),
th.Property(
Expand Down

0 comments on commit 1c0f90c

Please sign in to comment.