Skip to content

Commit

Permalink
Merge pull request #51 from NaturalHistoryMuseum/ginger/transform-data
Browse files Browse the repository at this point in the history
Data transformations configured via request args
  • Loading branch information
alycejenni authored Mar 21, 2022
2 parents ecc0113 + 4b471a4 commit 09eecd0
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 12 deletions.
17 changes: 11 additions & 6 deletions ckanext/versioned_datastore/lib/downloads/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .jsonl import jsonl_writer
from .sv import sv_writer
from .utils import calculate_field_counts
from .transform import Transform
from .. import common
from ..datastore_utils import trim_index_name, prefix_resource
from ...interfaces import IVersionedDatastoreDownloads
Expand Down Expand Up @@ -58,15 +59,15 @@

def queue_download(email_address, download_id, query_hash, query, query_version, search,
resource_ids_and_versions, separate_files, file_format, format_args,
ignore_empty_fields):
ignore_empty_fields, transform):
'''
Queues a job which when run will download the data for the resource.
:return: the queued job
'''
request = DownloadRequest(email_address, download_id, query_hash, query, query_version, search,
resource_ids_and_versions, separate_files, file_format, format_args,
ignore_empty_fields)
ignore_empty_fields, transform)
# pass a timeout of 1 hour (3600 seconds)
return toolkit.enqueue_job(download, args=[request], queue='download', title=str(request),
rq_kwargs={'timeout': 3600})
Expand All @@ -82,7 +83,7 @@ class DownloadRequest(object):

def __init__(self, email_address, download_id, query_hash, query, query_version, search,
resource_ids_and_versions, separate_files, file_format, format_args,
ignore_empty_fields):
ignore_empty_fields, transform):
self.email_address = email_address
self.download_id = download_id
self.query_hash = query_hash
Expand All @@ -94,6 +95,7 @@ def __init__(self, email_address, download_id, query_hash, query, query_version,
self.file_format = file_format
self.format_args = format_args
self.ignore_empty_fields = ignore_empty_fields
self.transform = transform

@property
def resource_ids(self):
Expand All @@ -120,6 +122,7 @@ def generate_download_hash(self):
self.file_format,
self.format_args,
self.ignore_empty_fields,
self.transform,
]
download_hash = hashlib.sha1('|'.join(map(str, to_hash)).encode('utf-8'))
return download_hash.hexdigest()
Expand Down Expand Up @@ -165,6 +168,9 @@ def download(request):
sniff_on_connection_fail=True, sniff_timeout=10,
http_compress=False, timeout=30)

for plugin in PluginImplementations(IVersionedDatastoreDownloads):
request = plugin.download_before_write(request)

# this manifest will be written out as JSON and put in the download zip
manifest = {
'download_id': request.download_id,
Expand All @@ -173,6 +179,7 @@ def download(request):
'file_format': request.file_format,
'format_args': request.format_args,
'ignore_empty_fields': request.ignore_empty_fields,
'transform': request.transform,
}
# calculate, per resource, the number of values for each field present in the search
field_counts = calculate_field_counts(request, es_client)
Expand All @@ -183,9 +190,6 @@ def download(request):
# keep track of the resource record counts
resource_counts = {}

for plugin in PluginImplementations(IVersionedDatastoreDownloads):
request = plugin.download_before_write(request)

with writer_function(request, target_dir, field_counts) as writer:
# handle each resource individually. We could search across all resources at the same
# but we don't need to seeing as we're not doing sorting here. By handling each index
Expand All @@ -201,6 +205,7 @@ def download(request):
total_records = 0
for hit in search.scan():
data = hit.data.to_dict()
data = Transform.transform_data(data, request.transform)
resource_id = trim_index_name(hit.meta.index)
# call the write function returned by our format specific writer context manager
writer(hit, data, resource_id)
Expand Down
1 change: 1 addition & 0 deletions ckanext/versioned_datastore/lib/downloads/dwc/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def initialise(self, record):
return
# check 'type' is right
valid_types = ['StillImage', 'MovingImage', 'Sound', 'PhysicalObject', 'Event', 'Text']
self._core_writer.fieldnames = sorted(self._core_writer.fieldnames)
if 'type' in record and record['type'] not in valid_types:
self._core_writer.fieldnames = [f for f in self._core_writer.fieldnames if f != 'type']
self._core_writer.writeheader()
Expand Down
4 changes: 2 additions & 2 deletions ckanext/versioned_datastore/lib/downloads/dwc/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ def write(hit, data, resource_id):
else:
archive = open_files[resource_id]

archive.initialise(data)

if request.ignore_empty_fields:
data = filter_data_fields(data, field_counts[resource_id])

archive.initialise(data)

# always add the resource ID
data['datasetID'] = resource_id

Expand Down
40 changes: 40 additions & 0 deletions ckanext/versioned_datastore/lib/downloads/transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import logging

from ckan.plugins import toolkit

log = logging.getLogger(__name__)
base_url = toolkit.config.get('ckan.site_url')
object_endpoint = toolkit.config.get('ckanext.versioned_datastore.record_view_endpoint', 'object.view')


class Transform:
@classmethod
def transform_data(cls, data, config):
id_as_url = config.get('id_as_url')
if id_as_url:
field = id_as_url.get('field', 'id')
data = cls.id_as_url(data, field)
return data

@classmethod
def id_as_url(cls, data, field):
'''
Reformat an ID field as a URL (probably one that links to that record). Requires an endpoint
(config option ckanext.versioned_datastore.record_view_endpoint, default 'object.view')
taking the ID field as the 'uuid' named argument.
:param data: the record data to be transformed
:param field: the name of the data field that contains the ID and that will contain the URL
:return: the transformed data (or untransformed if there was an error).
'''
try:
object_id = data.get(field)
if object_id is None or object_id == '':
log.error(f'Failed to get uuid from field "{field}".')
return data
kwargs = {'uuid': object_id}
url = toolkit.url_for(object_endpoint, **kwargs)
except:
log.error(f'Failed to generate URL from ID.', exc_info=True)
return data
data[field] = base_url + url
return data
10 changes: 7 additions & 3 deletions ckanext/versioned_datastore/logic/actions/downloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
@action(schema.datastore_queue_download(), help.datastore_queue_download)
def datastore_queue_download(email_address, context, query=None, query_version=None, version=None,
resource_ids=None, resource_ids_and_versions=None, separate_files=True,
format='csv', format_args=None, ignore_empty_fields=True):
format='csv', format_args=None, ignore_empty_fields=True,
transform=None):
'''
Starts a download of the data found by the given query parameters. This download is created
asynchronously using the rq background job queue and a link to the results is emailed to the
Expand Down Expand Up @@ -47,6 +48,7 @@ def datastore_queue_download(email_address, context, query=None, query_version=N
:param format_args: additional arguments for specific formats, e.g. extension names for DwC-A.
:param ignore_empty_fields: whether to ignore fields with no data in them in the result set
and not write them into the download file(s). Default: True.
:param transform: data transformation configuration.
:return: a dict containing info about the background job that is doing the downloading and the
download id
'''
Expand Down Expand Up @@ -88,12 +90,14 @@ def datastore_queue_download(email_address, context, query=None, query_version=N
query_hash = hash_query(query, query_version)

format_args = format_args or {}
transform = transform or {}

options = {
'separate_files': separate_files,
'format': format,
'format_args': format_args,
'ignore_empty_fields': ignore_empty_fields
'ignore_empty_fields': ignore_empty_fields,
'transform': transform
}
download = DatastoreDownload(query_hash=query_hash, query=query, query_version=query_version,
resource_ids_and_versions=rounded_resource_ids_and_versions,
Expand All @@ -102,7 +106,7 @@ def datastore_queue_download(email_address, context, query=None, query_version=N

job = queue_download(email_address, download.id, query_hash, query, query_version,
search.to_dict(), rounded_resource_ids_and_versions, separate_files,
format, format_args, ignore_empty_fields)
format, format_args, ignore_empty_fields, transform)

return {
'queued_at': job.enqueued_at.isoformat(),
Expand Down
3 changes: 2 additions & 1 deletion ckanext/versioned_datastore/logic/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ def datastore_queue_download():
'separate_files': [ignore_missing, boolean_validator],
'format': [ignore_missing, str],
'ignore_empty_fields': [ignore_missing, boolean_validator],
'format_args': [ignore_missing, json_validator]
'format_args': [ignore_missing, json_validator],
'transform': [ignore_missing, json_validator],
}


Expand Down

0 comments on commit 09eecd0

Please sign in to comment.