Skip to content

Commit

Permalink
mdp code place holders
Browse files Browse the repository at this point in the history
  • Loading branch information
fils committed Jan 6, 2024
1 parent a9c25b7 commit cc0f6ed
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 37 deletions.
13 changes: 13 additions & 0 deletions graphOps/extraction/mdp/defs/readSource.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from urllib.request import urlopen
# from minio import Minio


def read_data(source):
Expand All @@ -11,3 +12,15 @@ def read_data(source):
with open(source, 'r') as file:
dg = file.read()
return dg


# add in s3:// support
# # Initialize minioClient with an endpoint and access/secret keys.
# minioClient = Minio('play.minio.io:9000',
# access_key='YOURACCESSKEY',
# secret_key='YOURSECRETKEY',
# secure=True)
#
# # Use function to read object into a string
# object_string = read_object_to_string(minioClient, 'mybucket', 'myobject')
# print(object_string)
9 changes: 9 additions & 0 deletions graphOps/extraction/mdp/defs/readobject.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@


def read_object_to_string(mc, bucket_name, object_name):
try:
data = mc.get_object(bucket_name, object_name)
data_str = data.read().decode('utf-8')
return data_str
except Exception as e:
print(e)
91 changes: 54 additions & 37 deletions graphOps/extraction/mdp/mdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ def main():
print("Saving results to file")
_, file_extension = os.path.splitext(output_file)

## TODO why is this here? likely should be down in the geo section
mf['centroid'] = mf['centroid'].astype(str)

if file_extension == '.parquet':
mf.to_parquet(output_file, engine='fastparquet') # engine must be one of 'pyarrow', 'fastparquet'
elif file_extension == '.csv':
Expand Down Expand Up @@ -84,7 +81,8 @@ def graphProcessor(dg):
urls = [
"https://raw.githubusercontent.com/iodepo/odis-in/master/SPARQL/searchOIH/sup_geo.rq",
"https://raw.githubusercontent.com/iodepo/odis-in/master/SPARQL/searchOIH/sup_temporal.rq",
"https://raw.githubusercontent.com/iodepo/odis-in/master/SPARQL/searchOIH/dataset.rq"
"https://raw.githubusercontent.com/iodepo/odis-in/master/SPARQL/searchOIH/dataset.rq",
"https://raw.githubusercontent.com/iodepo/odis-in/master/SPARQL/searchOIH/person.rq"
]

for url in urls:
Expand All @@ -104,12 +102,11 @@ def graphProcessor(dg):
print(f"Error: {e}")

# NOTE: In many IDEs the vars for qlist will look like an error since these vars are dynamic set at runtime via globals()
qlist = [datasetrq, sup_georq, sup_temporalrq]
qlist = [personrq, datasetrq, sup_georq, sup_temporalrq]

dfl = []
print(
"Processing {} SPARQL queries. This will be slow and the progress bar updates only on query completion".format(
len(qlist)))
"Processing {} SPARQL queries. Can be slow, progress bar updates on query completion".format(len(qlist)))
for q in tqdm(qlist):
df = kg.query_as_df(q)
if len(df) > 0: # don't append in empty result sets, breaks the merge
Expand All @@ -131,27 +128,51 @@ def graphProcessor(dg):
# Hint to the garbage collector that it's cleanup time
gc.collect()

### Temporal shaping
print("Processing Stage: Temporal")

if "temporalCoverage" in merged_df.columns:
merged_df['temporalCoverage'] = merged_df['temporalCoverage'].astype( 'str') # fine to make str since we don't use in the solr JSON
merged_df['dt_startDate'] = merged_df['temporalCoverage'].apply( lambda x: re.split("/", x)[0] if "/" in x else np.nan)
merged_df['dt_endDate'] = merged_df['temporalCoverage'].apply(lambda x: re.split("/", x)[1] if "/" in x else np.nan)
merged_df['n_startYear'] = merged_df['dt_startDate'].apply( lambda x: parser.parse(x).year if "-" in str(x) else np.nan)
merged_df['n_endYear'] = merged_df['dt_endDate'].apply(lambda x: parser.parse(x).year if "-" in str(x) else np.nan)
else:
print("NOTE: no temporal data found")

# EXIT
# sys.exit(0)

# Check frames after temporal ETL calls
# merged_df.info()
# merged_df.head()

### GeoSpatial
print("Processing Stage: Geospatial")

merged_df['filteredgeom'] = merged_df['geom'].apply(lambda x: np.nan if graphshapers.contains_alpha(x) else x)
if "geom" in merged_df.columns:
merged_df['filteredgeom'] = merged_df['geom'].apply(lambda x: np.nan if graphshapers.contains_alpha(x) else x)

## Geometry representations
## Geometry representations

print("Processing Stage: Geospatial centroid")
merged_df['centroid'] = merged_df['filteredgeom'].apply(lambda x: spatial.gj(str(x), "centroid"))
print("Processing Stage: Geospatial centroid")
merged_df['centroid'] = merged_df['filteredgeom'].apply(lambda x: spatial.gj(str(x), "centroid"))

print("Processing Stage: Geospatial length")
merged_df['length'] = merged_df['filteredgeom'].apply(lambda x: spatial.gj(str(x), "length"))
print("Processing Stage: Geospatial length")
merged_df['length'] = merged_df['filteredgeom'].apply(lambda x: spatial.gj(str(x), "length"))

print("Processing Stage: Geospatial area")
merged_df['area'] = merged_df['filteredgeom'].apply(lambda x: spatial.gj(str(x), "area"))
print("Processing Stage: Geospatial area")
merged_df['area'] = merged_df['filteredgeom'].apply(lambda x: spatial.gj(str(x), "area"))

print("Processing Stage: Geospatial wkt")
merged_df['wkt'] = merged_df['filteredgeom'].apply(lambda x: spatial.gj(str(x), "wkt"))
print("Processing Stage: Geospatial wkt")
merged_df['wkt'] = merged_df['filteredgeom'].apply(lambda x: spatial.gj(str(x), "wkt"))

print("Processing Stage: Geospatial geojson")
merged_df['geojson'] = merged_df['filteredgeom'].apply(lambda x: spatial.gj(str(x), "geojson"))
print("Processing Stage: Geospatial geojson")
merged_df['geojson'] = merged_df['filteredgeom'].apply(lambda x: spatial.gj(str(x), "geojson"))


else:
print("NOTE: no geometry data found")

# TODO, incorporate Jeff's code as a Lambda function (will need to support multiple possible regions per entry)
if "name" in merged_df.columns:
Expand All @@ -169,30 +190,24 @@ def graphProcessor(dg):

# merged_df = regionFor.mergeRegions(merged_df.copy())

### Temporal shaping
print("Processing Stage: Temporal")

merged_df['temporalCoverage'] = merged_df['temporalCoverage'].astype(
'str') # fine to make str since we don't use in the solr JSON
merged_df['dt_startDate'] = merged_df['temporalCoverage'].apply(
lambda x: re.split("/", x)[0] if "/" in x else np.nan)
merged_df['dt_endDate'] = merged_df['temporalCoverage'].apply(lambda x: re.split("/", x)[1] if "/" in x else np.nan)
merged_df['n_startYear'] = merged_df['dt_startDate'].apply(
lambda x: parser.parse(x).year if "-" in str(x) else np.nan)
merged_df['n_endYear'] = merged_df['dt_endDate'].apply(lambda x: parser.parse(x).year if "-" in str(x) else np.nan)

# Check frames after temporal ETL calls
# merged_df.info()
# merged_df.head()

### Dataframe groupby, aggregation and joins
print("Processing Stage: Dataframe shaping")

merged_df['id'] = merged_df['id'].astype(str) # why is this needed?
merged_df['url'] = merged_df['url'].astype(str) # why is this needed?
merged_df['description'] = merged_df['description'].astype(str) # why is this needed?


# transforms needed for aggregation
merged_df['keywords'] = merged_df['keywords'].astype(str) # why is this needed?
merged_df['geom'] = merged_df['geom'].astype(str) # why is this needed?
merged_df['filteredgeom'] = merged_df['filteredgeom'].astype(str) # why is this needed?
merged_df['centroid'] = merged_df['centroid'].astype(str) # why is this needed?
if "keywords" in merged_df.columns:
merged_df['keywords'] = merged_df['keywords'].astype(str) # why is this needed?

if "geom" in merged_df.columns:
merged_df['geom'] = merged_df['geom'].astype(str) # why is this needed?
merged_df['filteredgeom'] = merged_df['filteredgeom'].astype(str) # why is this needed?
merged_df['centroid'] = merged_df['centroid'].astype(str) # why is this needed?

agg_dict = {'keywords': ', '.join,
'type': 'first',
Expand Down Expand Up @@ -225,6 +240,8 @@ def graphProcessor(dg):
if col not in merged_df.columns:
del agg_dict[col]

print(merged_df.head())

mf = merged_df.groupby('id').agg(agg_dict).reset_index()

return mf
Expand Down
25 changes: 25 additions & 0 deletions graphOps/releaser/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Releaser

## About

This code is used to make a public snapshot of the current
set of graphs in .../graphs/latest into public/graphs.

It should do this as a snapshot set, make a prefix like:





### DEPRECTATED the following is no longer the patter, date the prefix, not the object
For the latest *_release.nq
```bash
public/graphs/odisgraph_v1_[DATE]
```
and

For the latest *_prov.nq
```bash
public/graphs/odisgraph_v1_prov_[DATE]
```

86 changes: 86 additions & 0 deletions graphOps/releaser/releaser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# makes a geopackage and h3 output files
# bring in https://colab.research.google.com/drive/1cIGmvpyEG2giLeeUCdzHXgb9w4Mp8h2_

from minio import Minio
from minio.commonconfig import CopySource
import os


# read oih/gleaner.oih/graphs/latest
# copy into/over public/graphs if not 0

def main():
# read the items in a prefix

sk = os.getenv("MINIO_SECRET_KEY")
ak = os.getenv("MINIO_ACCESS_KEY")

# Create client with access and secret key.
client = Minio("nas.lan:49153", ak, sk, secure=False)

sbucket = "gleaner.oih"
sprefix = "graphs/latest/"
ll = olist(client, sbucket, sprefix)
# for o in ll:
# print(o)

dbucket = "public"
dprefix = "graphs/test1/"
pl = olist(client, dbucket, dprefix)
# for o in pl:
# print(o)

print("------------------------")

# dl = diff_lists(ll, pl)
dl = diff_lists(remove_prefix(ll, sprefix), remove_prefix(pl, dprefix))

for o in dl:
print("need to copy over {}".format(o))
ocopy(sbucket, sprefix, dbucket, dprefix, o, client)


def ocopy(src_bucket, src_prefix, dst_bucket, dst_prefix, object_name, client):
src = CopySource(src_bucket, src_prefix + object_name)

try:
# copy object with new key in same bucket.
client.copy_object(dst_bucket, dst_prefix + object_name, src)
print(f"Object '{object_name}' has been copied to '{dst_prefix + object_name}'")
except Exception as err:
print(f"An error occurred: {err}")


def diff_lists(source, destination):
return [item for item in source if item not in destination]


def remove_prefix(lst, prefix):
return [item.replace(prefix, '', 1) for item in lst]


def olist(client, bucket, prefix):
objects = client.list_objects(
bucket, prefix=prefix, recursive=True,
)

onl = [obj.object_name for obj in objects]

znl = []
for o in onl:
try:
obj_info = client.stat_object(bucket, o)
# print("{} is {} ".format(o, obj_info.size))
if obj_info.size <= 0:
znl.append(o)
print("found 0 len object {}".format(o))
except Exception as e:
print(e)

onl = [item for item in onl if item not in znl]

return onl


if __name__ == '__main__':
main()

0 comments on commit cc0f6ed

Please sign in to comment.