Skip to content

Commit

Permalink
mdp and releaser updates
Browse files Browse the repository at this point in the history
  • Loading branch information
fils committed Apr 24, 2024
1 parent 1163e25 commit b401a49
Show file tree
Hide file tree
Showing 8 changed files with 557 additions and 37 deletions.
73 changes: 73 additions & 0 deletions graphOps/extraction/mdp/defs/polar_calls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import re
import polars as pl

def templateQ(qlist, subject, type):
template = qlist['q2'] # later select this based on the type variable
template = template.replace('SUBJECTIRI', subject)
# print(template)

return template

def qrSelects(qlist, t):
template = qlist['q2'] ## TODO update to pull based on type passed

start_word = 'DISTINCT'
end_word = 'WHERE'

start_index = template.find(start_word) + len(start_word)
end_index = template.find(end_word)

between_text = template[start_index:end_index].strip()
matches = re.findall(r'\?\w+', between_text)
result = [match[1:] for match in matches]

if 's' in result:
result.remove('s')

# The regex finds all cases of ?var so remove duplicates if any
unique_list = []
for item in result:
if item not in unique_list:
unique_list.append(item)

return unique_list

# Notes:
# df comes in at 1893
def dataset_list(df, store, qlist):

sl = qrSelects(qlist, "foo") # get vars for query of type "t"
print(sl)

dl = []
for i in range(len(df)): # this loop will run for len(df)
row = df.slice(i, 1)
s = row['id'][0]
t = row['type'][0] # fetch column 'type'

sl = qrSelects(qlist, t) # get vars for query of type "t"
qr = list(store.query(templateQ(qlist, s, t))) # query RDF for subject s of type t

# print("{} : {}".format(s, t))
# print(qr)

d = dict()
for r in qr:
for term in sl:
if r[term] is not None:
# print("{} {}".format(term, r[term].value))
if r[term].value != '':
d[term] = r[term].value

print(d)
if len(d) > 0:
dl.append(d)

print("Length of dataset query results: {}".format(len(dl)))

df = pl.from_records(dl, schema=sl)

print(len(df))

print(df)

11 changes: 5 additions & 6 deletions graphOps/extraction/mdp/mdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@


def main():
# Params
parser = argparse.ArgumentParser(description="Process some arguments.")
parser.add_argument("--source", type=str, help="Source URL")
parser.add_argument("--output", type=str, help="Output file")
Expand All @@ -37,19 +38,18 @@ def main():
print("Error: the --output argument is required")
sys.exit(1)

# URL for the release graph to process
u = args.source
o = args.output

# Load graph
dg = readSource.read_data(u)
mf = graphProcessor(dg)

# Reporting
print("Reporting Stage: The following is the current dataframe shape to exported")
print(mf.info())

# Save to minio
# TODO move to a def file saveobject.py
# Save
saveobject.write_data(o, mf)


Expand Down Expand Up @@ -101,6 +101,8 @@ def graphProcessor(dg):
if len(df) > 0: # don't append in empty result sets, breaks the merge
# df.info()
dfl.append(df)
del df
gc.collect()

common_column = ["id", "type"]
merged_df = reduce(lambda left, right: pd.merge(left, right, on=common_column,suffixes=('_left', '_right'), how='outer'), dfl)
Expand All @@ -117,9 +119,6 @@ def graphProcessor(dg):

merged_df.info()

# Hint to the garbage collector that it's cleanup time
gc.collect()

### Temporal shaping
print("Processing Stage: Temporal")

Expand Down
112 changes: 112 additions & 0 deletions graphOps/extraction/mdp/mdp_oxigraph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import argparse
import gc
import re
import sys
import io
import warnings
import datetime
from functools import reduce

import kglab
import numpy as np
import pandas as pd
from dateutil import parser
from rdflib import ConjunctiveGraph # needed for quads
import polars as pl
from tqdm import tqdm
from pyoxigraph import *

from defs import graphshapers
from defs import load_queries
from defs import readSource
from defs import polar_calls
from defs import regionFor
from defs import spatial
from defs import saveobject

warnings.simplefilter(action='ignore', category=FutureWarning) # remove pandas future warning


def main():
# Params
parser = argparse.ArgumentParser(description="Process some arguments.")
parser.add_argument("--source", type=str, help="Source URL")
parser.add_argument("--output", type=str, help="Output file")

args = parser.parse_args()

if args.source is None:
print("Error: the --source argument is required")
sys.exit(1)

if args.output is None:
print("Error: the --output argument is required")
sys.exit(1)

u = args.source
o = args.output

# Load graph
print("RDF download started", datetime.datetime.now())
dg = readSource.read_data(u)
print("RDF downloaded, starting load stage", datetime.datetime.now())

mf = graphProcessor(dg)

# # Reporting
# print("Reporting Stage: The following is the current dataframe shape to exported")
# print(mf.info())
#
# # Save
# saveobject.write_data(o, mf)


def graphProcessor(dg):
r = graphshapers.contextAlignment(dg)

print("RDF loaded, starting query stage", datetime.datetime.now())

store = Store()
mime_type = "application/n-quads"
store.load(io.StringIO(r), mime_type, base_iri=None, to_graph=None)
print("RDF loaded, starting query stage", datetime.datetime.now())

# Load Queries
sfl = [
"./queries/subjectsTypes.rq", # q1
"./queries/template_dataset.rq", # q2
"./queries/baseQuery.rq",
"./queries/course.rq",
"./queries/dataset.rq",
"./queries/person.rq",
"./queries/sup_geo.rq",
"./queries/sup_temporal.rq"
]

qlist = load_queries.read_files(sfl)

# conduct initial query for types and associated subject IRIs
qr = list(store.query(qlist['q1']))

print("Length of SPARQL query results: {}".format(len(qr)))

qrl = []
for r in qr:
qrl.append([r['id'].value, r['type'].value])

# print(qr[0])
# print(qr[0]['id'].value)
# print(qr[0]['type'].value)

# for binding in qr:
# print("{} {}".format(binding['id'].value, binding['type'].value))
df = pl.from_records(qrl, schema=["id", "type"])
print("Length of Polars data frame: {}".format(len(df)))

dsl = polar_calls.dataset_list(df, store, qlist)

return 0


if __name__ == '__main__':
main()
Loading

0 comments on commit b401a49

Please sign in to comment.