diff --git a/kowalski/ingesters/ingest_catalog.py b/kowalski/ingesters/ingest_catalog.py index ff971fa0..9c626be6 100644 --- a/kowalski/ingesters/ingest_catalog.py +++ b/kowalski/ingesters/ingest_catalog.py @@ -91,6 +91,11 @@ def process_file(argument_list: Sequence): nhdu = 1 names = hdulist[nhdu].columns.names dataframe = pd.DataFrame(np.asarray(hdulist[nhdu].data), columns=names) + for col in dataframe.columns: + if dataframe[col].dtype in [">f4", ">f8"]: + dataframe[col] = dataframe[col].astype(float) + elif dataframe[col].dtype in [">i2", ">i4", ">i8"]: + dataframe[col] = dataframe[col].astype(int) if max_docs is not None and isinstance(max_docs, int): dataframe = dataframe.iloc[:max_docs] @@ -171,6 +176,8 @@ def process_file(argument_list: Sequence): # GeoJSON for 2D indexing document["coordinates"] = dict() # string format: H:M:S, D:M:S + if document[ra_col] == 360.0: + document[ra_col] = 0.0 document["coordinates"]["radec_str"] = [ deg2hms(document[ra_col]), deg2dms(document[dec_col]), @@ -181,6 +188,17 @@ def process_file(argument_list: Sequence): "type": "Point", "coordinates": _radec_geojson, } + + # if the entry is a string, strip it + # if it ends up being an empty string, it will be dropped + keys_to_pop = [] + for key, value in document.items(): + if isinstance(value, str): + document[key] = value.strip() + if not document[key] or len(document[key]) == 0: + keys_to_pop.append(key) + for key in keys_to_pop: + document.pop(key) except Exception as e: log(str(e)) bad_document_indexes.append(document_index) @@ -292,7 +310,7 @@ def process_file(argument_list: Sequence): mongo.insert_many(collection=collection, documents=batch) elif format == "parquet": - df = pq.read_table(file).to_pandas() + df: pd.DataFrame = pq.read_table(file).to_pandas() for name in list(df.columns): if name.startswith("_"): df.rename(columns={name: name[1:]}, inplace=True) diff --git a/kowalski/ingesters/ingest_ztf_matchfiles.py b/kowalski/ingesters/ingest_ztf_matchfiles.py index bc834051..0ec04817 100644 --- a/kowalski/ingesters/ingest_ztf_matchfiles.py +++ b/kowalski/ingesters/ingest_ztf_matchfiles.py @@ -153,16 +153,30 @@ def process_file(argument_list: Sequence): def clean_up_document(document): """Format passed in dicts for Mongo insertion""" + # we found some edge cases where the documents already had a "coordinates" field + # which was empty. If that happens, remove it + if ( + "coordinates" in document + and type(document["coordinates"]) == dict + and len(document["coordinates"]) == 0 + ): + document.pop("coordinates") # convert types for pymongo: for k, v in document.items(): - if k != "data": - if k in sources_int_fields: - document[k] = int(document[k]) - else: - document[k] = float(document[k]) - if k not in ("ra", "dec"): - # this will save a lot of space: - document[k] = round(document[k], 3) + try: + if k != "data": + if k in sources_int_fields: + document[k] = int(document[k]) + else: + document[k] = float(document[k]) + if k not in ("ra", "dec"): + # this will save a lot of space: + document[k] = round(document[k], 3) + except Exception as e: + log( + f"Failed to convert {k} to int or float: {e}, with value {str(v)}" + ) + raise e # generate unique _id: document["_id"] = baseid + document["matchid"] @@ -176,6 +190,8 @@ def clean_up_document(document): document["coordinates"] = dict() _ra = document["ra"] _dec = document["dec"] + if _ra == 360.0: + _ra = 0.0 _radec_str = [deg2hms(_ra), deg2dms(_dec)] document["coordinates"]["radec_str"] = _radec_str # for GeoJSON, must be lon:[-180, 180], lat:[-90, 90] (i.e. in deg) @@ -516,6 +532,11 @@ def run( for _ in tqdm(pool.imap(process_file, input_list), total=len(files)): pass + with multiprocessing.Pool(processes=num_proc) as pool: + with tqdm(total=len(files)) as pbar: + for _ in pool.imap_unordered(process_file, input_list): + pbar.update(1) + if __name__ == "__main__": fire.Fire(run)