Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Magneto #94

Merged
merged 3 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bdikit/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
BUILTIN_MODELS_BOX_URL = {
"cl-reducer-v0.1": "https://nyu.box.com/shared/static/hc4qxzbuxz0uoynfwy4pe2yxo5ch6xgm.pt",
"bdi-cl-v0.2": "https://nyu.box.com/shared/static/1vdc28kzbpoj6ey95bksaww541p9gj31.pt",
"magneto-gdc-v0.1": "https://nyu.box.com/shared/static/140g2rq1izc1wqs1ssrml6jzag3qa0mu.pth",
}

BDIKIT_EMBEDDINGS_CACHE_DIR = os.path.join(BDIKIT_CACHE_DIR, "embeddings")
Expand Down
19 changes: 19 additions & 0 deletions bdikit/schema_matching/one2one/matcher_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,25 @@ class SchemaMatchers(Enum):
"max_val_sim",
"bdikit.schema_matching.one2one.maxvalsim.MaxValSimSchemaMatcher",
)
MAGNETO = (
"magneto_zs_bp",
"bdikit.schema_matching.topk.magneto.Magneto",
)

MAGNETO_FT = (
"magneto_ft_bp",
"bdikit.schema_matching.topk.magneto.MagnetoFT",
)

MAGNETO_GPT = (
"magneto_zs_llm",
"bdikit.schema_matching.topk.magneto.MagnetoGPT",
)

MAGNETO_FTGPT = (
"magneto_ft_llm",
"bdikit.schema_matching.topk.magneto.MagnetoFTGPT",
)

def __init__(self, matcher_name: str, matcher_path: str):
self.matcher_name = matcher_name
Expand Down
131 changes: 131 additions & 0 deletions bdikit/schema_matching/topk/magneto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import pandas as pd
from typing import Dict, Any, List
from magneto import Magneto as Magneto_Lib
from bdikit.schema_matching.one2one.base import BaseSchemaMatcher
from bdikit.download import get_cached_model_or_download
from bdikit.schema_matching.topk.base import (
ColumnScore,
TopkMatching,
BaseTopkSchemaMatcher,
)

DEFAULT_MAGNETO_MODEL = "magneto-gdc-v0.1"


class MagnetoBase(BaseSchemaMatcher, BaseTopkSchemaMatcher):
def __init__(self, kwargs: Dict[str, Any] = None):
if kwargs is None:
kwargs = {}
self.magneto = Magneto_Lib(**kwargs)

def map(
self,
source: pd.DataFrame,
target: pd.DataFrame,
):
# There is an issue in Magneto to get the top-1 match, so get top 2 and then filter
self.magneto.params["topk"] = 2 # Magneto does not provide a method to set topk
raw_matches = self.magneto.get_matches(source, target)

# Organizing data into the desired structure
sorted_dict = {}
for (source, target), score in raw_matches.items():
source_column = source[1]
target_column = target[1]
if source_column not in sorted_dict:
sorted_dict[source_column] = []
sorted_dict[source_column].append((target_column, score))

# Sorting the lists by value in descending order and get top 1
formatted_matches = {}
for key in sorted_dict:
sorted_matches = sorted(sorted_dict[key], key=lambda x: x[1], reverse=True)
formatted_matches[key] = sorted_matches[0][0]

return formatted_matches

def get_recommendations(
self, source: pd.DataFrame, target: pd.DataFrame, top_k: int
) -> List[TopkMatching]:
self.magneto.params["topk"] = (
top_k # Magneto does not provide a method to set topk
)
raw_matches = self.magneto.get_matches(source, target)

# Organizing data into the desired structure
sorted_dict = {}
for (source, target), score in raw_matches.items():
source_column = source[1]
target_column = target[1]
if source_column not in sorted_dict:
sorted_dict[source_column] = []
sorted_dict[source_column].append((target_column, score))

# Sorting the lists by value in descending order and format top k
top_k_results = []
for key in sorted_dict:
sorted_matches = sorted(sorted_dict[key], key=lambda x: x[1], reverse=True)
top_k_columns = [ColumnScore(name, score) for name, score in sorted_matches]
top_k_results.append(
{
"source_column": [key] * len(top_k_columns),
"top_k_columns": top_k_columns,
}
)

return top_k_results


class Magneto(MagnetoBase):
def __init__(self):
super().__init__()


class MagnetoFT(MagnetoBase):
def __init__(
self,
encoding_mode: str = "header_values_verbose",
model_name: str = DEFAULT_MAGNETO_MODEL,
model_path: str = None,
):
embedding_model = check_magneto_model(model_name, model_path)
kwargs = {"encoding_mode": encoding_mode, "embedding_model": embedding_model}
super().__init__(kwargs)


class MagnetoGPT(MagnetoBase):
def __init__(self):
kwargs = {"use_bp_reranker": False, "use_gpt_reranker": True}
super().__init__(kwargs)


class MagnetoFTGPT(MagnetoBase):
def __init__(
self,
encoding_mode: str = "header_values_verbose",
model_name: str = DEFAULT_MAGNETO_MODEL,
model_path: str = None,
):
embedding_model = check_magneto_model(model_name, model_path)
kwargs = {
"encoding_mode": encoding_mode,
"embedding_model": embedding_model,
"use_bp_reranker": False,
"use_gpt_reranker": True,
}
super().__init__(kwargs)


def check_magneto_model(model_name: str, model_path: str):
if model_name and model_path:
raise ValueError(
"Only one of model_name or model_path should be provided "
"(they are mutually exclusive)"
)

if model_path:
return model_path
elif model_name:
return get_cached_model_or_download(model_name)
else:
raise ValueError("Either model_name or model_path must be provided")
20 changes: 20 additions & 0 deletions bdikit/schema_matching/topk/matcher_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,26 @@ class TopkMatchers(Enum):
"bdikit.schema_matching.topk.contrastivelearning.CLTopkSchemaMatcher",
)

MAGNETO = (
"magneto_zs_bp",
"bdikit.schema_matching.topk.magneto.Magneto",
)

MAGNETO_FT = (
"magneto_ft_bp",
"bdikit.schema_matching.topk.magneto.MagnetoFT",
)

MAGNETO_GPT = (
"magneto_zs_llm",
"bdikit.schema_matching.topk.magneto.MagnetoGPT",
)

MAGNETO_FTGPT = (
"magneto_ft_llm",
"bdikit.schema_matching.topk.magneto.MagnetoFTGPT",
)

def __init__(self, matcher_name: str, matcher_path: str):
self.matcher_name = matcher_name
self.matcher_path = matcher_path
Expand Down
12 changes: 12 additions & 0 deletions docs/source/schema-matching.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@ To see how to use these methods, please refer to the documentation of :py:func:`
* - Method
- Class
- Description
* - ``magneto_zs_bp``
- :class:`~bdikit.schema_matching.topk.magneto.Magneto`
- | Uses a zero-shot small language model as retriever with the bipartite algorithm as reranker in Magneto.
* - ``magneto_ft_bp``
- :class:`~bdikit.schema_matching.topk.magneto.MagnetoFT`
- | Uses a fine-tuned small language model as retriever with the bipartite algorithm as reranker in Magneto.
* - ``magneto_zs_llm``
- :class:`~bdikit.schema_matching.topk.magneto.MagnetoGPT`
- | Uses a zero-shot small language model as retriever with a large language model as reranker in Magneto.
* - ``magneto_ft_llm``
- :class:`~bdikit.schema_matching.topk.magneto.MagnetoFTGPT`
- | Uses a fine-tuned small language model as retriever with a large language model as reranker in Magneto.
* - ``ct_learning``
- :class:`~bdikit.schema_matching.one2one.contrastivelearning.ContrastiveLearningSchemaMatcher`
- | Uses a contrastive (CT) learning model to learn embeddings for columns and retrieves the best match most similar columns using the cosine similarity between the column embeddings.
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ requests
scipy<1.13
matplotlib<3.9
panel!=1.4.3
nltk>=3.9.1
nltk>=3.9.1
magneto-python
Loading