Skip to content

Commit

Permalink
ZTF matchfiles: imap_unordered + doc cleanup fix (#270)
Browse files Browse the repository at this point in the history
* use imag_unordered properly and avoid issues with doc cleanup

* avoid dtype related issues when ingesting FITS files
  • Loading branch information
Theodlz authored Jan 31, 2024
1 parent 3283a0d commit b9da46d
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 9 deletions.
20 changes: 19 additions & 1 deletion kowalski/ingesters/ingest_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ def process_file(argument_list: Sequence):
nhdu = 1
names = hdulist[nhdu].columns.names
dataframe = pd.DataFrame(np.asarray(hdulist[nhdu].data), columns=names)
for col in dataframe.columns:
if dataframe[col].dtype in [">f4", ">f8"]:
dataframe[col] = dataframe[col].astype(float)
elif dataframe[col].dtype in [">i2", ">i4", ">i8"]:
dataframe[col] = dataframe[col].astype(int)

if max_docs is not None and isinstance(max_docs, int):
dataframe = dataframe.iloc[:max_docs]
Expand Down Expand Up @@ -171,6 +176,8 @@ def process_file(argument_list: Sequence):
# GeoJSON for 2D indexing
document["coordinates"] = dict()
# string format: H:M:S, D:M:S
if document[ra_col] == 360.0:
document[ra_col] = 0.0
document["coordinates"]["radec_str"] = [
deg2hms(document[ra_col]),
deg2dms(document[dec_col]),
Expand All @@ -181,6 +188,17 @@ def process_file(argument_list: Sequence):
"type": "Point",
"coordinates": _radec_geojson,
}

# if the entry is a string, strip it
# if it ends up being an empty string, it will be dropped
keys_to_pop = []
for key, value in document.items():
if isinstance(value, str):
document[key] = value.strip()
if not document[key] or len(document[key]) == 0:
keys_to_pop.append(key)
for key in keys_to_pop:
document.pop(key)
except Exception as e:
log(str(e))
bad_document_indexes.append(document_index)
Expand Down Expand Up @@ -292,7 +310,7 @@ def process_file(argument_list: Sequence):
mongo.insert_many(collection=collection, documents=batch)

elif format == "parquet":
df = pq.read_table(file).to_pandas()
df: pd.DataFrame = pq.read_table(file).to_pandas()
for name in list(df.columns):
if name.startswith("_"):
df.rename(columns={name: name[1:]}, inplace=True)
Expand Down
37 changes: 29 additions & 8 deletions kowalski/ingesters/ingest_ztf_matchfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,30 @@ def process_file(argument_list: Sequence):

def clean_up_document(document):
"""Format passed in dicts for Mongo insertion"""
# we found some edge cases where the documents already had a "coordinates" field
# which was empty. If that happens, remove it
if (
"coordinates" in document
and type(document["coordinates"]) == dict
and len(document["coordinates"]) == 0
):
document.pop("coordinates")
# convert types for pymongo:
for k, v in document.items():
if k != "data":
if k in sources_int_fields:
document[k] = int(document[k])
else:
document[k] = float(document[k])
if k not in ("ra", "dec"):
# this will save a lot of space:
document[k] = round(document[k], 3)
try:
if k != "data":
if k in sources_int_fields:
document[k] = int(document[k])
else:
document[k] = float(document[k])
if k not in ("ra", "dec"):
# this will save a lot of space:
document[k] = round(document[k], 3)
except Exception as e:
log(
f"Failed to convert {k} to int or float: {e}, with value {str(v)}"
)
raise e

# generate unique _id:
document["_id"] = baseid + document["matchid"]
Expand All @@ -176,6 +190,8 @@ def clean_up_document(document):
document["coordinates"] = dict()
_ra = document["ra"]
_dec = document["dec"]
if _ra == 360.0:
_ra = 0.0
_radec_str = [deg2hms(_ra), deg2dms(_dec)]
document["coordinates"]["radec_str"] = _radec_str
# for GeoJSON, must be lon:[-180, 180], lat:[-90, 90] (i.e. in deg)
Expand Down Expand Up @@ -516,6 +532,11 @@ def run(
for _ in tqdm(pool.imap(process_file, input_list), total=len(files)):
pass

with multiprocessing.Pool(processes=num_proc) as pool:
with tqdm(total=len(files)) as pbar:
for _ in pool.imap_unordered(process_file, input_list):
pbar.update(1)


if __name__ == "__main__":
fire.Fire(run)

0 comments on commit b9da46d

Please sign in to comment.