Skip to content

Commit

Permalink
YDA-4236 fix intake module prefix issues
Browse files Browse the repository at this point in the history
When searching for a collection and its subcollections,
one needs to search for both "/zoneName/myCollection"
and "/zoneName/myCollection/%". Searching for '/zoneName/myCollection%"
is incorrect, since this will also match collections with the same
prefix (e.g. "/zoneName/myCollection2"), which can cause unexpected
results when processing collections with a common prefix.
  • Loading branch information
stsnel committed Oct 6, 2023
1 parent 17df0ea commit ce10bd0
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 69 deletions.
80 changes: 61 additions & 19 deletions intake.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
__license__ = 'GPLv3, see LICENSE'

import fnmatch
import itertools
import time
import traceback

Expand Down Expand Up @@ -113,15 +114,20 @@ def api_intake_count_total_files(ctx, coll):
:returns: Total file count
"""
# Include coll name as equal names do occur and genquery delivers distinct results.
iter = genquery.row_iterator(
main_collection_iterator = genquery.row_iterator(
"COLL_NAME, DATA_NAME",
"COLL_NAME = '" + coll + "'",
genquery.AS_LIST, ctx
)

subcollection_iterator = genquery.row_iterator(
"COLL_NAME, DATA_NAME",
"COLL_NAME like '" + coll + "%'",
"COLL_NAME like '" + coll + "/%'",
genquery.AS_LIST, ctx
)

count = 0
for row in iter:
for row in itertools.chain(main_collection_iterator, subcollection_iterator):
exclusion_matched = any(fnmatch.fnmatch(row[1], p) for p in INTAKE_FILE_EXCLUSION_PATTERNS)
if not exclusion_matched:
count += 1
Expand Down Expand Up @@ -151,14 +157,20 @@ def api_intake_list_unrecognized_files(ctx, coll):
return {}

# Include coll name as equal names do occur and genquery delivers distinct results.
iter = genquery.row_iterator(
main_collection_iterator = genquery.row_iterator(
"COLL_NAME, DATA_NAME, COLL_CREATE_TIME, DATA_OWNER_NAME",
"COLL_NAME = '" + coll + "' AND META_DATA_ATTR_NAME = 'unrecognized'",
genquery.AS_LIST, ctx
)

subcollection_iterator = genquery.row_iterator(
"COLL_NAME, DATA_NAME, COLL_CREATE_TIME, DATA_OWNER_NAME",
"COLL_NAME like '" + coll + "%' AND META_DATA_ATTR_NAME = 'unrecognized'",
"COLL_NAME like '" + coll + "/%' AND META_DATA_ATTR_NAME = 'unrecognized'",
genquery.AS_LIST, ctx
)

files = []
for row in iter:
for row in itertools.chain(main_collection_iterator, subcollection_iterator):
# Check whether object type is within exclusion pattern
exclusion_matched = any(fnmatch.fnmatch(row[1], p) for p in INTAKE_FILE_EXCLUSION_PATTERNS)
if not exclusion_matched:
Expand Down Expand Up @@ -202,22 +214,36 @@ def api_intake_list_datasets(ctx, coll):
datasets = []

# 1) Query for datasets distinguished by collections
iter = genquery.row_iterator(
c_main_collection_iterator = genquery.row_iterator(
"META_COLL_ATTR_VALUE, COLL_NAME",
"COLL_NAME like '" + coll + "%' AND META_COLL_ATTR_NAME = 'dataset_toplevel' ",
"COLL_NAME = '" + coll + "' AND META_COLL_ATTR_NAME = 'dataset_toplevel' ",
genquery.AS_LIST, ctx
)
for row in iter:

c_subcollection_iterator = genquery.row_iterator(
"META_COLL_ATTR_VALUE, COLL_NAME",
"COLL_NAME LIKE '" + coll + "/%' AND META_COLL_ATTR_NAME = 'dataset_toplevel' ",
genquery.AS_LIST, ctx
)

for row in itertools.chain(c_main_collection_iterator, c_subcollection_iterator):
dataset = get_dataset_details(ctx, row[0], row[1])
datasets.append(dataset)

# 2) Query for datasets distinguished dataobjects
iter = genquery.row_iterator(
d_main_collection_iterator = genquery.row_iterator(
"META_DATA_ATTR_VALUE, COLL_NAME",
"COLL_NAME like '" + coll + "%' AND META_DATA_ATTR_NAME = 'dataset_toplevel' ",
"COLL_NAME = '" + coll + "' AND META_DATA_ATTR_NAME = 'dataset_toplevel' ",
genquery.AS_LIST, ctx
)
for row in iter:

d_subcollection_iterator = genquery.row_iterator(
"META_DATA_ATTR_VALUE, COLL_NAME",
"COLL_NAME LIKE '" + coll + "/%' AND META_DATA_ATTR_NAME = 'dataset_toplevel' ",
genquery.AS_LIST, ctx
)

for row in itertools.chain(d_main_collection_iterator, d_subcollection_iterator):
dataset = get_dataset_details(ctx, row[0], row[1])
datasets.append(dataset)

Expand Down Expand Up @@ -368,25 +394,41 @@ def get_dataset_toplevel_objects(ctx, root, dataset_id):
:returns: Dict holding objects for the dataset
"""
iter = genquery.row_iterator(
c_main_collection_iterator = genquery.row_iterator(
"COLL_NAME",
"COLL_NAME LIKE '" + root + "%' AND META_COLL_ATTR_NAME = 'dataset_toplevel' "
"COLL_NAME = '" + root + "' AND META_COLL_ATTR_NAME = 'dataset_toplevel' "
"AND META_COLL_ATTR_VALUE = '" + dataset_id + "'",
genquery.AS_LIST, ctx
)
for row in iter:

c_subcollection_iterator = genquery.row_iterator(
"COLL_NAME",
"COLL_NAME LIKE '" + root + "/%' AND META_COLL_ATTR_NAME = 'dataset_toplevel' "
"AND META_COLL_ATTR_VALUE = '" + dataset_id + "'",
genquery.AS_LIST, ctx
)

for row in itertools.chain(c_main_collection_iterator, c_subcollection_iterator):
return {'is_collection': True,
'objects': [row[0]]}

# For dataobject situation gather all object path strings as a list
iter = genquery.row_iterator(
d_main_collection_iterator = genquery.row_iterator(
"DATA_NAME, COLL_NAME",
"COLL_NAME = '" + root + "' AND META_DATA_ATTR_NAME = 'dataset_toplevel' "
"AND META_DATA_ATTR_VALUE = '" + dataset_id + "'",
genquery.AS_LIST, ctx
)

d_subcollection_iterator = genquery.row_iterator(
"DATA_NAME, COLL_NAME",
"COLL_NAME like '" + root + "%' AND META_DATA_ATTR_NAME = 'dataset_toplevel' "
"COLL_NAME LIKE '" + root + "/%' AND META_DATA_ATTR_NAME = 'dataset_toplevel' "
"AND META_DATA_ATTR_VALUE = '" + dataset_id + "'",
genquery.AS_LIST, ctx
)

objects = []
for row in iter:
for row in itertools.chain(d_main_collection_iterator, d_subcollection_iterator):
objects.append(row[1] + '/' + row[0])
return {'is_collection': False,
'objects': objects}
Expand Down
51 changes: 35 additions & 16 deletions intake_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
__copyright__ = 'Copyright (c) 2019-2021, Utrecht University'
__license__ = 'GPLv3, see LICENSE'

import itertools

import genquery

from util import *
Expand All @@ -27,12 +29,16 @@ def intake_report_export_study_data(ctx, study_id):
"""
zone = user.zone(ctx)

result = genquery.row_iterator("COLL_NAME, COLL_PARENT_NAME, META_COLL_ATTR_NAME, META_COLL_ATTR_VALUE",
"COLL_NAME like '/{}/home/grp-vault-{}%' AND META_COLL_ATTR_NAME IN ('dataset_id', 'dataset_date_created', 'wave', 'version', 'experiment_type', 'pseudocode')".format(zone, study_id),
genquery.AS_LIST, ctx)
main_collection_iterator = genquery.row_iterator("COLL_NAME, COLL_PARENT_NAME, META_COLL_ATTR_NAME, META_COLL_ATTR_VALUE",
" = '/{}/home/grp-vault-{}' AND META_COLL_ATTR_NAME IN ('dataset_id', 'dataset_date_created', 'wave', 'version', 'experiment_type', 'pseudocode')".format(zone, study_id),
genquery.AS_LIST, ctx)

subcollection_iterator = genquery.row_iterator("COLL_NAME, COLL_PARENT_NAME, META_COLL_ATTR_NAME, META_COLL_ATTR_VALUE",
"COLL_NAME like '/{}/home/grp-vault-{}/%' AND META_COLL_ATTR_NAME IN ('dataset_id', 'dataset_date_created', 'wave', 'version', 'experiment_type', 'pseudocode')".format(zone, study_id),
genquery.AS_LIST, ctx)

datasets = {}
for row in result:
for row in itertools.chain(main_collection_iterator, subcollection_iterator):
path = row[0]
try:
datasets[path][row[2]] = row[3]
Expand All @@ -48,10 +54,15 @@ def intake_report_export_study_data(ctx, study_id):
real_datasets[set_path]['totalFiles'] = 0

# get the filesize and file count
result = genquery.row_iterator("count(DATA_ID), sum(DATA_SIZE)",
"COLL_NAME like '{}%'".format(set_path),
genquery.AS_LIST, ctx)
for row in result:
stat_main_collection_iterator = genquery.row_iterator("count(DATA_ID), sum(DATA_SIZE)",
"COLL_NAME = '{}'".format(set_path),
genquery.AS_LIST, ctx)

stat_subcollection_iterator = genquery.row_iterator("count(DATA_ID), sum(DATA_SIZE)",
"COLL_NAME like '{}/%'".format(set_path),
genquery.AS_LIST, ctx)

for row in itertools.chain(stat_main_collection_iterator, stat_subcollection_iterator):
real_datasets[set_path]['totalFiles'] = int(row[0]) / 2
totalFileSize = 0
if row[1]:
Expand All @@ -77,14 +88,18 @@ def intake_youth_get_datasets_in_study(ctx, study_id):
"""
zone = user.zone(ctx)

result = genquery.row_iterator("COLL_NAME, COLL_PARENT_NAME, META_COLL_ATTR_NAME, META_COLL_ATTR_VALUE",
"COLL_NAME like '/{}/home/grp-vault-{}%' AND META_COLL_ATTR_NAME IN ('dataset_id', 'dataset_date_created', 'wave', 'version', 'experiment_type', 'pseudocode')".format(zone, study_id),
genquery.AS_LIST, ctx)
main_collection_iterator = genquery.row_iterator("COLL_NAME, COLL_PARENT_NAME, META_COLL_ATTR_NAME, META_COLL_ATTR_VALUE",
"COLL_NAME = '/{}/home/grp-vault-{}' AND META_COLL_ATTR_NAME IN ('dataset_id', 'dataset_date_created', 'wave', 'version', 'experiment_type', 'pseudocode')".format(zone, study_id),
genquery.AS_LIST, ctx)

subcollection_iterator = genquery.row_iterator("COLL_NAME, COLL_PARENT_NAME, META_COLL_ATTR_NAME, META_COLL_ATTR_VALUE",
"COLL_NAME LIKE '/{}/home/grp-vault-{}/*' AND META_COLL_ATTR_NAME IN ('dataset_id', 'dataset_date_created', 'wave', 'version', 'experiment_type', 'pseudocode')".format(zone, study_id),
genquery.AS_LIST, ctx)

datasets = {}

# Construct all datasets.
for row in result:
for row in itertools.chain(main_collection_iterator, subcollection_iterator):
dataset = row[0]
attribute_name = row[2]
attribute_value = row[3]
Expand Down Expand Up @@ -206,11 +221,15 @@ def vault_aggregated_info(ctx, study_id):
continue

zone = user.zone(ctx)
result = genquery.row_iterator("DATA_NAME, COLL_NAME, DATA_SIZE, COLL_CREATE_TIME",
"COLL_NAME like '/{}/home/grp-vault-{}%'".format(zone, study_id),
genquery.AS_LIST, ctx)
main_collection_iterator = genquery.row_iterator("DATA_NAME, COLL_NAME, DATA_SIZE, COLL_CREATE_TIME",
"COLL_NAME = '/{}/home/grp-vault-{}'".format(zone, study_id),
genquery.AS_LIST, ctx)

subcollection_iterator = genquery.row_iterator("DATA_NAME, COLL_NAME, DATA_SIZE, COLL_CREATE_TIME",
"COLL_NAME like '/{}/home/grp-vault-{}/%'".format(zone, study_id),
genquery.AS_LIST, ctx)

for row in result:
for row in itertools.chain(main_collection_iterator, subcollection_iterator):
coll_name = row[1]
data_size = int(row[2])
coll_create_time = int(row[3])
Expand Down
62 changes: 35 additions & 27 deletions intake_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
__copyright__ = 'Copyright (c) 2019-2021, Utrecht University'
__license__ = 'GPLv3, see LICENSE'

import itertools
import time

import genquery
Expand Down Expand Up @@ -293,22 +294,18 @@ def dataset_get_ids(ctx, coll):
data_ids = set()

# Get distinct data_ids
iter = genquery.row_iterator(
main_collection_iterator = genquery.row_iterator(
"META_DATA_ATTR_VALUE",
"COLL_NAME = '" + coll + "' AND META_DATA_ATTR_NAME = 'dataset_id' ",
genquery.AS_LIST, ctx
)
for row in iter:
if row[0]:
data_ids.add(row[0])

# Get distinct data_ids
iter = genquery.row_iterator(
subcollection_iterator = genquery.row_iterator(
"META_DATA_ATTR_VALUE",
"COLL_NAME LIKE '" + coll + "%' AND META_DATA_ATTR_NAME = 'dataset_id' ",
"COLL_NAME LIKE '" + coll + "/%' AND META_DATA_ATTR_NAME = 'dataset_id' ",
genquery.AS_LIST, ctx
)
for row in iter:
for row in itertools.chain(main_collection_iterator, subcollection_iterator):
if row[0]:
data_ids.add(row[0])

Expand Down Expand Up @@ -359,7 +356,7 @@ def intake_check_dataset(ctx, root, dataset_id):
else:
avu.set_on_data(ctx, tl, "object_count", str(count))

count = get_aggregated_object_error_count(ctx, dataset_id, tl)
count = get_aggregated_object_error_count(ctx, tl)
if is_collection:
avu.set_on_coll(ctx, tl, "object_errors", str(count))
else:
Expand Down Expand Up @@ -396,24 +393,19 @@ def get_rel_paths_objects(ctx, root, dataset_id):
except Exception:
parent_coll = '/'

"""
iter = genquery.row_iterator(
main_collection_iterator = genquery.row_iterator(
"DATA_NAME, COLL_NAME",
"COLL_NAME = '" + parent_coll + "' AND META_DATA_ATTR_NAME = 'dataset_id' AND META_DATA_ATTR_VALUE = '" + dataset_id + "' ",
genquery.AS_LIST, ctx
)
for row in iter:
# add objects residing in parent_coll directly to list
log.write(ctx, "DIRECT " + row[0])
rel_path_objects.append(row[0])
"""

iter = genquery.row_iterator(
subcollection_iterator = genquery.row_iterator(
"DATA_NAME, COLL_NAME",
"COLL_NAME LIKE '" + parent_coll + "%' AND META_DATA_ATTR_NAME = 'dataset_id' AND META_DATA_ATTR_VALUE = '" + dataset_id + "' ",
"COLL_NAME LIKE '" + parent_coll + "/%' AND META_DATA_ATTR_NAME = 'dataset_id' AND META_DATA_ATTR_VALUE = '" + dataset_id + "' ",
genquery.AS_LIST, ctx
)
for row in iter:

for row in itertools.chain(main_collection_iterator, subcollection_iterator):
# Add objects including relative paths
rel_path_objects.append(row[1][len(parent_coll):] + '/' + row[0])

Expand All @@ -429,25 +421,41 @@ def get_aggregated_object_count(ctx, dataset_id, tl_collection):
:returns: Aggregated object count
"""
return len(list(genquery.row_iterator(
main_collection_iterator = genquery.row_iterator(
"DATA_ID",
"COLL_NAME like '" + tl_collection + "%' AND META_DATA_ATTR_NAME = 'dataset_id' "
"COLL_NAME = '" + tl_collection + "' AND META_DATA_ATTR_NAME = 'dataset_id' "
"AND META_DATA_ATTR_VALUE = '" + dataset_id + "' ",
genquery.AS_LIST, ctx
)))
)

subcollection_iterator = genquery.row_iterator(
"DATA_ID",
"COLL_NAME like '" + tl_collection + "/%' AND META_DATA_ATTR_NAME = 'dataset_id' "
"AND META_DATA_ATTR_VALUE = '" + dataset_id + "' ",
genquery.AS_LIST, ctx
)

return len(list(main_collection_iterator) + list(subcollection_iterator))


def get_aggregated_object_error_count(ctx, dataset_id, tl_collection):
def get_aggregated_object_error_count(ctx, tl_collection):
"""Return total amount of object errors.
:param ctx: Combined type of a callback and rei struct
:param dataset_id: Dataset id
:param tl_collection: Collection name of top level
:returns: Total amount of object errors
"""
return len(list(genquery.row_iterator(
main_collection_iterator = genquery.row_iterator(
"DATA_ID",
"COLL_NAME like '" + tl_collection + "%' AND META_DATA_ATTR_NAME = 'error' ",
"COLL_NAME = '" + tl_collection + "' AND META_DATA_ATTR_NAME = 'error' ",
genquery.AS_LIST, ctx
)))
)

subcollection_iterator = genquery.row_iterator(
"DATA_ID",
"COLL_NAME like '" + tl_collection + "/%' AND META_DATA_ATTR_NAME = 'error' ",
genquery.AS_LIST, ctx
)

return len(list(main_collection_iterator) + list(subcollection_iterator))
Loading

0 comments on commit ce10bd0

Please sign in to comment.