Skip to content

Commit

Permalink
up
Browse files Browse the repository at this point in the history
  • Loading branch information
armandleopold committed Jan 17, 2024
1 parent a35b6d8 commit 84f2cc9
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 63 deletions.
72 changes: 10 additions & 62 deletions profiling_pack/main.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,15 @@
"""
Main file for pack
"""
import re
import json
import warnings
import pandas as pd
from ydata_profiling import ProfileReport
from opener import load_data
import utils

warnings.filterwarnings("ignore", category=DeprecationWarning)

# Function to extract variable name from the content
def extract_variable_name(content):
# Regular expression pattern to extract variable name
pattern = r"^(.*?)\s+has"
match = re.search(pattern, content)
if match:
return match.group(1) # Return the found variable name
return "" # Return empty string if no match found

def round_if_numeric(value, decimals=2):
try:
# Convert to a float and round
rounded_value = round(float(value), decimals)
# If the rounded value is an integer, convert it to an int
if rounded_value.is_integer():
return str(int(rounded_value))
# Otherwise, format it as a string with two decimal places
return "{:.2f}".format(rounded_value)
except (ValueError, TypeError):
# Return the original value if it's not a number
return str(value)

# Function to extract percentage and determine level
def determine_level(content):
"""
Function to extract percentage and determine level
"""
# Find percentage value in the string
match = re.search(r"(\d+(\.\d+)?)%", content)
if match:
percentage = float(match.group(1))
# Determine level based on percentage
if 0 <= percentage <= 70:
return "info"
elif 71 <= percentage <= 90:
return "warning"
elif 91 <= percentage <= 100:
return "high"
return "info" # Default level if no percentage is found


# Denormalize a dictionary with nested dictionaries
def denormalize(data):
"""
Denormalize a dictionary with nested dictionaries
"""
denormalized = {}
for index, content in data.items():
if isinstance(content, dict):
for inner_key, inner_value in content.items():
new_key = f"{index}_{inner_key.lower()}"
denormalized[new_key] = inner_value
else:
denormalized[index] = content
return denormalized
########################### Loading Data

# Load the configuration file
print("Load source_conf.json")
Expand All @@ -77,8 +22,11 @@ def denormalize(data):
pack_config = json.load(file)

# Load data using the opener.py logic
from opener import load_data
df = load_data(source_config, pack_config)

########################### Profiling

# Run the profiling report
profile = ProfileReport(df, minimal=True, title="Profiling Report")
profile.to_file("report.html")
Expand Down Expand Up @@ -109,12 +57,12 @@ def denormalize(data):
with open("report.json", "r", encoding="utf-8") as file:
report = json.load(file)

general_data = denormalize(report["table"])
general_data = utils.denormalize(report["table"])
new_format_data = []
for key, value in general_data.items():
entry = {
"key": key,
"value": round_if_numeric(value),
"value": utils.round_if_numeric(value),
"scope": {"perimeter": "dataset", "value": source_config["name"]},
}
new_format_data.append(entry)
Expand All @@ -126,7 +74,7 @@ def denormalize(data):
for attr_name, attr_value in attributes.items():
entry = {
"key": attr_name,
"value": round_if_numeric(attr_value),
"value": utils.round_if_numeric(attr_value),
"scope": {"perimeter": "column", "value": variable_name},
}
new_format_data.append(entry)
Expand Down Expand Up @@ -162,10 +110,10 @@ def denormalize(data):
alerts.columns = ["content", "type"]

# Apply the extract_variable_name function to set the 'scope' column
alerts["scope"] = alerts["content"].apply(lambda x: {"perimeter": "column", "value": extract_variable_name(x)})
alerts["scope"] = alerts["content"].apply(lambda x: {"perimeter": "column", "value": utils.extract_variable_name(x)})

# Apply the function to the 'content' column of the alerts DataFrame
alerts["level"] = alerts["content"].apply(determine_level)
alerts["level"] = alerts["content"].apply(utils.determine_level)

############################ Schemas

Expand Down
2 changes: 1 addition & 1 deletion profiling_pack/properties.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ icon: icon.png
name: profiling
type: completeness
url: https://github.com/qalita-io/packs/tree/main/profiling_pack
version: 1.0.52
version: 1.0.53
visibility: public
59 changes: 59 additions & 0 deletions profiling_pack/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
### Contains general utility functions ###
import re

# Function to extract variable name from the content
def extract_variable_name(content):
# Regular expression pattern to extract variable name
pattern = r"^(.*?)\s+has"
match = re.search(pattern, content)
if match:
return match.group(1) # Return the found variable name
return "" # Return empty string if no match found

def round_if_numeric(value, decimals=2):
try:
# Convert to a float and round
rounded_value = round(float(value), decimals)
# If the rounded value is an integer, convert it to an int
if rounded_value.is_integer():
return str(int(rounded_value))
# Otherwise, format it as a string with two decimal places
return "{:.2f}".format(rounded_value)
except (ValueError, TypeError):
# Return the original value if it's not a number
return str(value)

# Function to extract percentage and determine level
def determine_level(content):
"""
Function to extract percentage and determine level
"""
# Find percentage value in the string
match = re.search(r"(\d+(\.\d+)?)%", content)
if match:
percentage = float(match.group(1))
# Determine level based on percentage
if 0 <= percentage <= 70:
return "info"
elif 71 <= percentage <= 90:
return "warning"
elif 91 <= percentage <= 100:
return "high"
return "info" # Default level if no percentage is found


# Denormalize a dictionary with nested dictionaries
def denormalize(data):
"""
Denormalize a dictionary with nested dictionaries
"""
denormalized = {}
for index, content in data.items():
if isinstance(content, dict):
for inner_key, inner_value in content.items():
new_key = f"{index}_{inner_key.lower()}"
denormalized[new_key] = inner_value
else:
denormalized[index] = content
return denormalized

0 comments on commit 84f2cc9

Please sign in to comment.