diff --git a/README.md b/README.md index e3aefb96..1b89d87a 100644 --- a/README.md +++ b/README.md @@ -29,29 +29,34 @@ Deploy a pre-trained embedding model without writing a single line of code. # Installation
Details -``` -pip install dicee +``` bash +git clone https://github.com/dice-group/dice-embeddings.git +conda create -n dice python=3.10 --no-default-packages && conda activate dice +pip3 install -r requirements.txt ``` - or - +```bash +pip install dicee ``` -git clone https://github.com/dice-group/dice-embeddings.git -conda create -n dice python=3.10 --no-default-packages && conda activate dice -pip3 install "pandas>=1.5.1" +or +```bash pip3 install "torch>=2.0.0" +pip3 install "pandas>=1.5.1" pip3 install "polars>=0.16.14" pip3 install "scikit-learn>=1.2.2" pip3 install "pyarrow>=11.0.0" -pip3 install "pytest>=7.2.2" -pip3 install "gradio>=3.23.0" -pip3 install "psutil>=5.9.4" pip3 install "pytorch-lightning==1.6.4" pip3 install "pykeen==1.10.1" pip3 install "zstandard>=0.21.0" +pip3 install "pytest>=7.2.2" +pip3 install "psutil>=5.9.4" +pip3 install "ruff>=0.0.284" +pip3 install "gradio>=3.23.0" +pip3 install "rdflib>=7.0.0" ``` + To test the Installation -``` +```bash wget https://hobbitdata.informatik.uni-leipzig.de/KG/KGs.zip unzip KGs.zip pytest -p no:warnings -x # it takes circa 15 minutes @@ -66,19 +71,8 @@ pyreverse dicee/trainer && dot -Tpng -x classes.dot -o trainer.png && eog traine ```
-## Docker -To build the Docker image: -``` -docker build -t dice-embeddings . -``` - -To test the Docker image: -``` -docker run --rm -v ~/.local/share/dicee/KGs:/dicee/KGs dice-embeddings ./main.py --model AConEx --embedding_dim 16 -``` - # Knowledge Graph Embedding Models -
Details +
To see available Models 1. TransE, DistMult, ComplEx, ConEx, QMult, OMult, ConvO, ConvQ, Keci 2. All 44 models available in https://github.com/pykeen/pykeen#models @@ -87,17 +81,54 @@ docker run --rm -v ~/.local/share/dicee/KGs:/dicee/KGs dice-embeddings ./main.py
# How to Train -
Details +
To see examples + +Train a KGE model and evaluate it on the train, validation, and test sets of the UMLS benchmark dataset. +```bash +python main.py --path_dataset_folder "KGs/UMLS" --model Keci --eval_model "train_val_test" +``` +where the data is in the following form +```bash +$ head -3 KGs/UMLS/train.txt +acquired_abnormality location_of experimental_model_of_disease +anatomical_abnormality manifestation_of physiologic_function +alga isa entity +``` +Models can be easily trained in a single node multi-gpu setting +```bash +python main.py --accelerator "gpu" --strategy "ddp" --path_dataset_folder "KGs/UMLS" --model Keci --eval_model "train_val_test" +``` -> Please refer to `examples`. +Train a KGE model by providing the path of a single file and store all parameters under newly created directory +called `KeciFamilyRun`. +```bash +python main.py --path_single_kg "KGs/Family/train.txt" --model Keci --path_to_store_single_run KeciFamilyRun +``` +where the data is in the following form +```bash +$ head -3 KGs/Family/train.txt +_:1 . + . + . +``` +**Apart from n-triples or standard link prediction dataset formats, we support ["owl", "nt", "turtle", "rdf/xml", "n3"]***. +Moreover, a KGE model can be also trained by providing **an endpoint of a triple store**. +```bash +python main.py --sparql_endpoint "http://localhost:3030/mutagenesis/" --model Keci +``` +For more, please refer to `examples`.
# How to Deploy +
To see a single line of code + ```python from dicee import KGE KGE(path='...').deploy(share=True,top_k=10) ``` +
+
To see the interface of the webservice Italian Trulli
@@ -172,6 +203,20 @@ Please contact: ```caglar.demir@upb.de ``` or ```caglardemir8@gmail.com ``` , i - [FB15K-237 ConEx embeddings](https://hobbitdata.informatik.uni-leipzig.de/KGE/conex/FB15K-237.zip) - [WN18RR ConEx embeddings](https://hobbitdata.informatik.uni-leipzig.de/KGE/conex/WN18RR.zip) - For more please look at [Hobbit Data](https://hobbitdata.informatik.uni-leipzig.de/KGE/) + +## Docker +
Details +To build the Docker image: +``` +docker build -t dice-embeddings . +``` + +To test the Docker image: +``` +docker run --rm -v ~/.local/share/dicee/KGs:/dicee/KGs dice-embeddings ./main.py --model AConEx --embedding_dim 16 +``` +
+ ### Documentation In documents folder, we explained many details about knowledge graphs, knowledge graph embeddings, training strategies and many more background knowledge. We continuously work on documenting each and every step to increase the readability of our code. diff --git a/analyse_experiments.py b/analyse_experiments.py index c9dac20a..ed7e5e71 100644 --- a/analyse_experiments.py +++ b/analyse_experiments.py @@ -131,8 +131,8 @@ def to_df(self): test_mrr=self.test_mrr, test_h1=self.test_h1, test_h3=self.test_h3, test_h10=self.test_h10, runtime=self.runtime, - params=self.num_params - #callbacks=self.callbacks, + params=self.num_params, + callbacks=self.callbacks, #normalization=self.normalization, #embeddingdim=self.embedding_dim ) @@ -148,6 +148,6 @@ def to_df(self): df = counter.to_df() pd.set_option("display.precision", 3) #print(df) -#print(df.to_latex(index=False,float_format="%.3f")) +print(df.to_latex(index=False,float_format="%.3f")) print(df.to_markdown(index=False)) diff --git a/dicee/__init__.py b/dicee/__init__.py index b9b3c5c9..29da1a62 100644 --- a/dicee/__init__.py +++ b/dicee/__init__.py @@ -3,4 +3,5 @@ from .trainer import DICE_Trainer # noqa from .knowledge_graph_embeddings import KGE # noqa from .executer import Execute # noqa -__version__ = '0.0.4' +from .dataset_classes import * # noqa +__version__ = '0.0.5' diff --git a/dicee/abstracts.py b/dicee/abstracts.py index f277bd6d..f160d7e1 100644 --- a/dicee/abstracts.py +++ b/dicee/abstracts.py @@ -29,8 +29,8 @@ def __init__(self, args, callbacks): self.callbacks = callbacks self.is_global_zero = True # Set True to use Model summary callback of pl. - torch.manual_seed(self.attributes.seed_for_computation) - torch.cuda.manual_seed_all(self.attributes.seed_for_computation) + torch.manual_seed(self.attributes.random_seed) + torch.cuda.manual_seed_all(self.attributes.random_seed) def on_fit_start(self, *args, **kwargs): """ diff --git a/dicee/callbacks.py b/dicee/callbacks.py index b2ead0c7..6912105d 100644 --- a/dicee/callbacks.py +++ b/dicee/callbacks.py @@ -222,7 +222,7 @@ def on_fit_end(self, trainer, model): """ def on_train_epoch_end(self, trainer, model): - self.epoch_counter+=1 + self.epoch_counter += 1 if self.epoch_counter % self.epoch_ratio == 0: model.eval() report = trainer.evaluator.eval(dataset=trainer.dataset, trained_model=model, @@ -230,8 +230,6 @@ def on_train_epoch_end(self, trainer, model): model.train() self.reports.append(report) - - def on_train_batch_end(self, *args, **kwargs): return @@ -296,7 +294,7 @@ def __init__(self, std: float = 0.1, epoch_ratio: int = None): self.epoch_ratio = epoch_ratio if epoch_ratio is not None else 1 self.epoch_counter = 0 - def on_train_epoch_end(self, trainer, model): + def on_train_epoch_start(self, trainer, model): if self.epoch_counter % self.epoch_ratio == 0: with torch.no_grad(): # Access the parameters @@ -304,3 +302,22 @@ def on_train_epoch_end(self, trainer, model): noise_mat = torch.normal(mean=0, std=self.std, size=param.shape, device=model.device) param.add_(noise_mat) self.epoch_counter += 1 + + +class RN(AbstractCallback): + """ Adding Uniform at Random Noise into Inputs/Parameters """ + + def __init__(self, std: float = 0.1, epoch_ratio: int = None): + super().__init__() + self.std = std + self.epoch_ratio = epoch_ratio if epoch_ratio is not None else 1 + self.epoch_counter = 0 + + def on_train_epoch_start(self, trainer, model): + if self.epoch_counter % self.epoch_ratio == 0: + with torch.no_grad(): + # Access the parameters + for param in model.parameters(): + noise_mat = torch.rand(size=param.shape) * self.std + param.add_(noise_mat) + self.epoch_counter += 1 diff --git a/dicee/config.py b/dicee/config.py index abd8d577..c141abb5 100644 --- a/dicee/config.py +++ b/dicee/config.py @@ -16,12 +16,22 @@ def __call__(self, parser, namespace, values, option_string=None): class Namespace(argparse.Namespace): def __init__(self, **kwargs): super().__init__(**kwargs) + "The path of a folder containing train.txt, and/or valid.txt and/or test.txt" self.path_dataset_folder: str = 'KGs/UMLS' + "A flag for saving embeddings in csv file." self.save_embeddings_as_csv: bool = False + "A directory named with time of execution under --storage_path that contains related data about embeddings." self.storage_path: str = 'Experiments' - self.absolute_path_to_store: str = None - self.absolute_path_dataset = None + "A single directory created that contains related data about embeddings." + self.path_to_store_single_run: str = None + "Path of a file corresponding to the input knowledge graph" + self.path_single_kg = None + "An endpoint of a triple store." + self.sparql_endpoint = None + "KGE model" self.model: str = "Keci" + " The ratio of added random triples into training dataset" + self.add_noise_rate: float = None self.p: int = 0 self.q: int = 1 self.optim: str = 'Adam' @@ -50,7 +60,8 @@ def __init__(self, **kwargs): self.kernel_size: int = 3 self.num_of_output_channels: int = 32 self.num_core: int = 0 - self.seed_for_computation: int = 0 + "Random Seed" + self.random_seed: int = 0 self.sample_triples_ratio = None self.read_only_few = None self.pykeen_model_kwargs: ParseDict = dict() diff --git a/dicee/dataset_classes.py b/dicee/dataset_classes.py index 78365b70..77ab90c7 100644 --- a/dicee/dataset_classes.py +++ b/dicee/dataset_classes.py @@ -254,7 +254,7 @@ def __init__(self, train_set: np.ndarray, num_entities, num_relations, neg_sampl label_smoothing_rate: float = 0.0): super().__init__() assert isinstance(train_set, np.ndarray) - assert isinstance(neg_sample_ratio,int) + assert isinstance(neg_sample_ratio, int) self.train_data = train_set self.num_entities = num_entities self.num_relations = num_relations @@ -314,6 +314,35 @@ def __getitem__(self, idx): return x, y_idx, y_vec +class NegSampleDataset(torch.utils.data.Dataset): + def __init__(self, train_set: np.ndarray, num_entities: int, num_relations: int, neg_sample_ratio: int = 1): + assert isinstance(train_set, np.ndarray) + # https://pytorch.org/docs/stable/data.html#multi-process-data-loading + # TLDL; replace Python objects with non-refcounted representations such as Pandas, Numpy or PyArrow objects + self.neg_sample_ratio = torch.tensor( + neg_sample_ratio) # 0 Implies that we do not add negative samples. This is needed during testing and validation + self.train_set = torch.from_numpy(train_set).unsqueeze(1) + # assert num_entities >= max(self.train_set[:, 0]) and num_entities >= max(self.train_set[:, 2]) + self.length = len(self.train_set) + self.num_entities = torch.tensor(num_entities) + self.num_relations = torch.tensor(num_relations) + + def __len__(self): + return self.length + + def __getitem__(self, idx): + # Generate negative sample + + triple = self.train_set[idx] + + corr_entities = torch.randint(0, high=self.num_entities, size=(1,)) + negative_triple = torch.cat((triple[:, 0], triple[:, 1], corr_entities), dim=0).unsqueeze(0) + + x = torch.cat((triple, negative_triple), dim=0) + y=torch.tensor([1.0, 0.0]) + return x,y + + class TriplePredictionDataset(torch.utils.data.Dataset): """ Triple Dataset @@ -426,7 +455,6 @@ def collate_fn(self, batch: List[torch.Tensor]): return x, label - class CVDataModule(pl.LightningDataModule): """ Create a Dataset for cross validation diff --git a/dicee/evaluator.py b/dicee/evaluator.py index d1afad67..d878f636 100644 --- a/dicee/evaluator.py +++ b/dicee/evaluator.py @@ -2,6 +2,7 @@ import numpy as np import json from .static_funcs import pickle +from .static_funcs_training import evaluate_lp class Evaluator: @@ -263,6 +264,14 @@ def evaluate_lp_k_vs_all(self, model, triple_idx, info=None, form_of_labelling=N def evaluate_lp(self, model, triple_idx, info): """ + + """ + # @TODO: Document this method + return evaluate_lp(model, triple_idx, num_entities=self.num_entities, + er_vocab=self.er_vocab,re_vocab=self.re_vocab,info=info) + + def dept_evaluate_lp(self, model, triple_idx, info): + """ Evaluate model in a standard link prediction task for each triple diff --git a/dicee/executer.py b/dicee/executer.py index c6a42325..30dd7e9f 100644 --- a/dicee/executer.py +++ b/dicee/executer.py @@ -36,7 +36,7 @@ def __init__(self, args, continuous_training=False): # (1) Process arguments and sanity checking. self.args = preprocesses_input_args(args) # (2) Ensure reproducibility. - seed_everything(args.seed_for_computation, workers=True) + seed_everything(args.random_seed, workers=True) # (3) Set the continual training flag self.is_continual_training = continuous_training # (4) Create an experiment folder or use the previous one @@ -198,9 +198,7 @@ def start(self) -> dict: self.trainer = DICE_Trainer(args=self.args, is_continual_training=self.is_continual_training, storage_path=self.storage_path, - evaluator=self.evaluator, - dataset=self.dataset # only used for Pykeen's models - ) + evaluator=self.evaluator) # (4) Start the training self.trained_model, form_of_labelling = self.trainer.start(dataset=self.dataset) return self.end(form_of_labelling) diff --git a/dicee/knowledge_graph.py b/dicee/knowledge_graph.py index cbc915c7..b8b0793d 100644 --- a/dicee/knowledge_graph.py +++ b/dicee/knowledge_graph.py @@ -7,15 +7,21 @@ class KG: """ Knowledge Graph """ - def __init__(self, data_dir: str = None, absolute_path_dataset:str=None, + def __init__(self, data_dir: str = None, + add_noise_rate:float=None, + sparql_endpoint:str=None, + path_single_kg:str=None, path_for_deserialization: str = None, add_reciprical: bool = None, eval_model: str = None, read_only_few: int = None, sample_triples_ratio: float = None, path_for_serialization: str = None, entity_to_idx=None, relation_to_idx=None, backend=None): """ + @TODO: Renzhong, can you please update/modify the docstrings. :param data_dir: A path of a folder containing the input knowledge graph - :param absolute_path_dataset: The path of a single file containing the input knowledge graph + :param add_noise_rate: Noisy triples added into the training adataset by x % of its size. + : param sparql_endpoint: An endpoint of a triple store + :param path_single_kg: The path of a single file containing the input knowledge graph :param path_for_deserialization: A path of a folder containing previously parsed data :param num_core: Number of subprocesses used for data loading :param add_reciprical: A flag for applying reciprocal data augmentation technique @@ -24,10 +30,12 @@ def __init__(self, data_dir: str = None, absolute_path_dataset:str=None, :param add_noise_rate: Add say 10% noise in the input data sample_triples_ratio """ + self.sparql_endpoint = sparql_endpoint + self.add_noise_rate = add_noise_rate self.num_entities = None self.num_relations = None self.data_dir = data_dir - self.absolute_path_dataset=absolute_path_dataset + self.path_single_kg=path_single_kg self.path_for_deserialization = path_for_deserialization self.add_reciprical = add_reciprical self.eval_model = eval_model @@ -62,7 +70,8 @@ def _describe(self) -> None: f'{len(self.train_set)}' \ f'\nNumber of triples on valid set:' \ f'{len(self.valid_set) if self.valid_set is not None else 0}' \ - f'\nNumber of triples on test set:{len(self.test_set) if self.test_set is not None else 0}\n' + f'\nNumber of triples on test set:' \ + f'{len(self.test_set) if self.test_set is not None else 0}\n' self.description_of_input += f"Entity Index:{sys.getsizeof(self.entity_to_idx) / 1_000_000_000:.5f} in GB\n" self.description_of_input += f"Relation Index:{sys.getsizeof(self.relation_to_idx) / 1_000_000_000:.5f} in GB\n" self.description_of_input += f"Train set :{self.train_set.nbytes / 1_000_000_000:.5f} in GB\n" diff --git a/dicee/knowledge_graph_embeddings.py b/dicee/knowledge_graph_embeddings.py index ba4bfb6a..40d3df09 100644 --- a/dicee/knowledge_graph_embeddings.py +++ b/dicee/knowledge_graph_embeddings.py @@ -5,11 +5,11 @@ from .abstracts import BaseInteractiveKGE from .dataset_classes import TriplePredictionDataset from .static_funcs import random_prediction, deploy_triple_prediction, deploy_tail_entity_prediction, \ - deploy_relation_prediction, deploy_head_entity_prediction + deploy_relation_prediction, deploy_head_entity_prediction, load_pickle +from .static_funcs_training import evaluate_lp import numpy as np import sys - class KGE(BaseInteractiveKGE): """ Knowledge Graph Embedding Class for interactive usage of pre-trained models""" @@ -21,7 +21,20 @@ def __init__(self, path, construct_ensemble=False, apply_semantic_constraint=apply_semantic_constraint) def __str__(self): - return 'KGE | ' + str(self.model) + return "KGE | " + str(self.model) + + def eval_lp_performance(self, dataset=List[Tuple[str, str, str]], filtered=True): + assert isinstance(dataset, list) and len(dataset) > 0 + + idx_dataset = np.array( + [(self.entity_to_idx[s], self.relation_to_idx[p], self.entity_to_idx[o]) for s, p, o in dataset]) + if filtered: + return evaluate_lp(model=self.model, triple_idx=idx_dataset, num_entities=len(self.entity_to_idx), + er_vocab=load_pickle(self.path + '/er_vocab.p'), + re_vocab=load_pickle(self.path + '/re_vocab.p')) + else: + return evaluate_lp(model=self.model, triple_idx=idx_dataset, num_entities=len(self.entity_to_idx), + er_vocab=None, re_vocab=None) def predict_missing_head_entity(self, relation: List[str], tail_entity: List[str]) -> Tuple: """ @@ -58,7 +71,6 @@ def predict_missing_head_entity(self, relation: List[str], tail_entity: List[str tail_entity.repeat(self.num_entities, )), dim=1) return self.model.forward(x) - def predict_missing_relations(self, head_entity: List[str], tail_entity: List[str]) -> Tuple: """ Given a head entity and a tail entity, return top k ranked relations. @@ -121,8 +133,8 @@ def predict_missing_tail_entity(self, head_entity: List[str], relation: List[str scores """ - x=torch.cat((torch.LongTensor([self.entity_to_idx[i] for i in head_entity]).unsqueeze(-1), - torch.LongTensor([self.relation_to_idx[i] for i in relation]).unsqueeze(-1)), dim=1) + x = torch.cat((torch.LongTensor([self.entity_to_idx[i] for i in head_entity]).unsqueeze(-1), + torch.LongTensor([self.relation_to_idx[i] for i in relation]).unsqueeze(-1)), dim=1) return self.model.forward(x) def predict(self, *, h: List[str] = None, r: List[str] = None, t: List[str] = None): diff --git a/dicee/models/base_model.py b/dicee/models/base_model.py index 1e1b7c0d..b478ce9a 100644 --- a/dicee/models/base_model.py +++ b/dicee/models/base_model.py @@ -111,11 +111,11 @@ def init_params_with_sanity_checking(self): self.normalizer_class = IdentityClass else: raise NotImplementedError() - if self.args.get("optim") in ['Adan', 'NAdam', 'Adam', 'SGD', 'ASGD', 'Sls', 'AdamSLS']: + if self.args.get("optim") in ['NAdam', 'Adam', 'SGD']: self.optimizer_name = self.args['optim'] else: print(f'--optim (***{self.args.get("optim")}***) not found') - self.optimizer_name='Adam' + self.optimizer_name = 'Adam' if self.args.get("init_param") is None: self.param_init = IdentityClass @@ -123,7 +123,7 @@ def init_params_with_sanity_checking(self): self.param_init = torch.nn.init.xavier_normal_ else: print(f'--init_param (***{self.args.get("init_param")}***) not found') - self.optimizer_name=IdentityClass + self.optimizer_name = IdentityClass def get_embeddings(self) -> Tuple[np.ndarray, np.ndarray]: # @TODO why twice data.data.? @@ -156,7 +156,7 @@ def configure_optimizers(self, parameters=None): else: raise KeyError() return self.selected_optimizer - + """ def get_optimizer_class(self): # default params in pytorch. if self.optimizer_name == 'SGD': @@ -165,9 +165,9 @@ def get_optimizer_class(self): return torch.optim.Adam else: raise KeyError() - + """ def loss_function(self, yhat_batch, y_batch): - return self.loss(input=yhat_batch, target=y_batch) + return self.loss(yhat_batch, y_batch) def forward_triples(self, *args, **kwargs): raise ValueError(f'MODEL:{self.name} does not have forward_triples function') @@ -200,6 +200,7 @@ def forward(self, x: Union[torch.LongTensor, Tuple[torch.LongTensor, torch.LongT else: return self.forward_sequence(x=x) + """ def training_step(self, batch, batch_idx): if len(batch) == 2: x_batch, y_batch = batch @@ -212,6 +213,13 @@ def training_step(self, batch, batch_idx): raise ValueError('Unexpected batch shape..') train_loss = self.loss_function(yhat_batch=yhat_batch, y_batch=y_batch) return train_loss + """ + + def training_step(self, batch, batch_idx=None): + x_batch, y_batch = batch + yhat_batch = self.forward(x_batch) + loss_batch = self.loss_function(yhat_batch, y_batch) + return loss_batch def training_epoch_end(self, training_step_outputs): batch_losses = [i['loss'].item() for i in training_step_outputs] @@ -220,6 +228,7 @@ def training_epoch_end(self, training_step_outputs): def validation_step(self, batch, batch_idx): """ + @ TODO # from torchmetrics import Accuracy as accuracy if len(batch) == 4: h, r, t, y_batch = batch @@ -235,6 +244,8 @@ def validation_step(self, batch, batch_idx): def validation_epoch_end(self, outputs: List[Any]) -> None: """ + @ TODO + x = [[x['val_acc'], x['val_loss']] for x in outputs] avg_val_acc, avg_loss = torch.tensor(x).mean(dim=0)[:] self.log('avg_loss_per_epoch', avg_loss, on_epoch=True, prog_bar=True) @@ -243,6 +254,8 @@ def validation_epoch_end(self, outputs: List[Any]) -> None: def test_step(self, batch, batch_idx): """ + @ TODO + if len(batch) == 4: h, r, t, y_batch = batch predictions = self.forward_triples(h, r, t) @@ -255,6 +268,7 @@ def test_step(self, batch, batch_idx): def test_epoch_end(self, outputs: List[Any]): """ + @ TODO avg_test_accuracy = torch.stack([x['test_accuracy'] for x in outputs]).mean() self.log('avg_test_accuracy', avg_test_accuracy, on_epoch=True, prog_bar=True) """ diff --git a/dicee/models/pykeen_models.py b/dicee/models/pykeen_models.py index e8515552..4bf0884e 100644 --- a/dicee/models/pykeen_models.py +++ b/dicee/models/pykeen_models.py @@ -5,6 +5,7 @@ import pickle from pykeen.models import model_resolver from .base_model import BaseKGE +import collections def load_numpy(path) -> np.ndarray: @@ -22,42 +23,126 @@ def load_pickle(*, file_path=str): class PykeenKGE(BaseKGE): """ A class for using knowledge graph embedding models implemented in Pykeen - Each model can be trained with KvsAll or NegSample scoring techniques. - Each model must be trained with PL + Notes: + Pykeen_DistMult: C + Pykeen_ComplEx: + Pykeen_QuatE: + Pykeen_MuRE: + Pykeen_CP: + Pykeen_HolE: + Pykeen_HolE: - Using our custom trainers seems to introduce a memory leak. + Training Pykeen_QuatE with KvsAll seems to continuously increase the memory usage """ - def __init__(self, args: dict, dataset): + + def __init__(self, args: dict): super().__init__(args) self.model_kwargs = {'embedding_dim': args['embedding_dim'], 'entity_initializer': None if args['init_param'] is None else torch.nn.init.xavier_normal_, - # 'entity_constrainer': None, for complex doesn't work but for distmult does - # 'regularizer': None works for ComplEx and DistMult but does not work for QuatE + #"entity_regularizer": None, + #"relation_regularizer": None, + "random_seed": args["random_seed"] } self.model_kwargs.update(args['pykeen_model_kwargs']) self.name = args['model'].split("_")[1] - self.model = model_resolver.make(self.name, self.model_kwargs, triples_factory=dataset.training) + + # Solving memory issue of Pykeen models caused by the regularizers + # See https://github.com/pykeen/pykeen/issues/1297 + + if self.name == "QuatE": + self.model_kwargs["entity_regularizer"] = None + self.model_kwargs["relation_regularizer"] = None + elif self.name == "DistMult": + self.model_kwargs["regularizer"] = None + elif self.name == "BoxE": + pass + elif self.name == "CP": + # No regularizers + pass + elif self.name == "HolE": + # No regularizers + pass + elif self.name == "ProjE": + # Nothing + pass + elif self.name == "RotatE": + pass + elif self.name == "TransE": + self.model_kwargs["regularizer"] = None + else: + print("Pykeen model have a memory leak caused by their implementation of requirlizers") + print(f"{self.name} does not seem to have any requirlizer") + + self.model = model_resolver. \ + make(self.name, self.model_kwargs, triples_factory= + collections.namedtuple('triples_factory', ['num_entities', 'num_relations', 'create_inverse_triples'])( + self.num_entities, self.num_relations, False)) self.loss_history = [] self.args = args - assert self.args['trainer']=='PL' + self.entity_embeddings = None + self.relation_embeddings = None + for (k, v) in self.model.named_modules(): + if "entity_representations" == k: + self.entity_embeddings = v[0]._embeddings + elif "relation_representations" == k: + self.relation_embeddings = v[0]._embeddings + elif "interaction" == k: + self.interaction = v + else: + pass + """ + if self.entity_embeddings.embedding_dim == 4 * self.embedding_dim: + self.last_dim = 4 + elif self.entity_embeddings.embedding_dim == 2 * self.embedding_dim: + self.last_dim = 2 + elif self.entity_embeddings.embedding_dim == self.embedding_dim: + self.last_dim = 0 + else: + raise NotImplementedError(self.entity_embeddings.embedding_dim) + """ def forward_k_vs_all(self, x: torch.LongTensor): + """ + # => Explicit version by this we can apply bn and dropout + + # (1) Retrieve embeddings of heads and relations + apply Dropout & Normalization if given. + h, r = self.get_head_relation_representation(x) + # (2) Reshape (1). + if self.last_dim > 0: + h = h.reshape(len(x), self.embedding_dim, self.last_dim) + r = r.reshape(len(x), self.embedding_dim, self.last_dim) + # (3) Reshape all entities. + if self.last_dim > 0: + t = self.entity_embeddings.weight.reshape(self.num_entities, self.embedding_dim, self.last_dim) + else: + t = self.entity_embeddings.weight + # (4) Call the score_t from interactions to generate triple scores. + return self.interaction.score_t(h=h, r=r, all_entities=t, slice_size=1) + """ + return self.model.score_t(x) + def forward_triples(self, x: torch.LongTensor) -> torch.FloatTensor: + """ + # => Explicit version by this we can apply bn and dropout + + # (1) Retrieve embeddings of heads, relations and tails and apply Dropout & Normalization if given. + h, r, t = self.get_triple_representation(x) + # (2) Reshape (1). + if self.last_dim > 0: + h = h.reshape(len(x), self.embedding_dim, self.last_dim) + r = r.reshape(len(x), self.embedding_dim, self.last_dim) + t = t.reshape(len(x), self.embedding_dim, self.last_dim) + # (3) Compute the triple score + return self.interaction.score(h=h, r=r, t=t, slice_size=None, slice_dim=0) + """ + return self.model.score_hrt(hrt_batch=x, mode=None).flatten() + def forward_k_vs_sample(self, x: torch.LongTensor, target_entity_idx): raise NotImplementedError() - def forward_triples(self, x: torch.LongTensor): - return self.model.score_hrt(x).flatten() - def forward(self, x: Union[torch.LongTensor, Tuple[torch.LongTensor, torch.LongTensor]], y_idx: torch.LongTensor = None): - """ - - :param x: a batch of inputs - :param y_idx: index of selected output labels. - :return: - """ if isinstance(x, tuple): x, y_idx = x return self.forward_k_vs_sample(x=x, target_entity_idx=y_idx) @@ -71,14 +156,3 @@ def forward(self, x: Union[torch.LongTensor, Tuple[torch.LongTensor, torch.LongT return self.forward_k_vs_all(x=x) else: return self.forward_sequence(x=x) - - def training_step(self, batch, batch_idx): - x_batch, y_batch = batch - yhat_batch = self.forward(x_batch) - loss_batch = self.loss_function(yhat_batch, y_batch) - return loss_batch + self.model.collect_regularization_term() - - def training_epoch_end(self, training_step_outputs): - batch_losses = [i['loss'].item() for i in training_step_outputs] - avg = sum(batch_losses) / len(batch_losses) - self.loss_history.append(avg) diff --git a/dicee/models/real.py b/dicee/models/real.py index efa16fed..0647804b 100644 --- a/dicee/models/real.py +++ b/dicee/models/real.py @@ -33,7 +33,8 @@ def forward_k_vs_sample(self, x: torch.LongTensor, target_entity_idx: torch.Long t = self.entity_embeddings(target_entity_idx).transpose(1, 2) return torch.bmm(hr, t).squeeze(1) - + def score(self,h,r,t): + return (self.hidden_dropout(self.hidden_normalizer(h * r)) * t).sum(dim=1) class TransE(BaseKGE): """ Translating Embeddings for Modeling diff --git a/dicee/read_preprocess_save_load_kg/preprocess.py b/dicee/read_preprocess_save_load_kg/preprocess.py index eead1b80..1aab27a7 100644 --- a/dicee/read_preprocess_save_load_kg/preprocess.py +++ b/dicee/read_preprocess_save_load_kg/preprocess.py @@ -1,9 +1,10 @@ import pandas as pd import time -import polars +import polars as pl from .util import create_recipriocal_triples, timeit, index_triples_with_pandas, dataset_sanity_checking from dicee.static_funcs import numpy_data_type_changer + class PreprocessKG: """ Preprocess the data in memory """ @@ -23,18 +24,16 @@ def start(self) -> None: """ if self.kg.backend == 'polars': self.preprocess_with_polars() - elif self.kg.backend in ['pandas', 'modin']: + elif self.kg.backend == 'pandas': self.preprocess_with_pandas() else: raise KeyError(f'{self.kg.backend} not found') - - print('Data Type conversion...') + print('Finding suitable integer type for the index...') self.kg.train_set = numpy_data_type_changer(self.kg.train_set, num=max(self.kg.num_entities, self.kg.num_relations)) if self.kg.valid_set is not None: self.kg.valid_set = numpy_data_type_changer(self.kg.valid_set, num=max(self.kg.num_entities, self.kg.num_relations)) - if self.kg.test_set is not None: self.kg.test_set = numpy_data_type_changer(self.kg.test_set, num=max(self.kg.num_entities, self.kg.num_relations)) @@ -102,35 +101,35 @@ def adding_reciprocal_triples(): """ Add reciprocal triples """ # (1.1) Add reciprocal triples into training set self.kg.train_set.extend(self.kg.train_set.select([ - polars.col("object").alias('subject'), - polars.col("relation").apply(lambda x: x + '_inverse'), - polars.col("subject").alias('object') + pl.col("object").alias('subject'), + pl.col("relation").apply(lambda x: x + '_inverse'), + pl.col("subject").alias('object') ])) if self.kg.valid_set is not None: # (1.2) Add reciprocal triples into valid_set set. self.kg.valid_set.extend(self.kg.valid_set.select([ - polars.col("object").alias('subject'), - polars.col("relation").apply(lambda x: x + '_inverse'), - polars.col("subject").alias('object') + pl.col("object").alias('subject'), + pl.col("relation").apply(lambda x: x + '_inverse'), + pl.col("subject").alias('object') ])) if self.kg.test_set is not None: # (1.2) Add reciprocal triples into test set. self.kg.test_set.extend(self.kg.test_set.select([ - polars.col("object").alias('subject'), - polars.col("relation").apply(lambda x: x + '_inverse'), - polars.col("subject").alias('object') + pl.col("object").alias('subject'), + pl.col("relation").apply(lambda x: x + '_inverse'), + pl.col("subject").alias('object') ])) - print('Adding Reciprocal Triples...', end=' ') + print('Adding Reciprocal Triples...') adding_reciprocal_triples() # (2) Type checking try: - assert isinstance(self.kg.train_set, polars.DataFrame) + assert isinstance(self.kg.train_set, pl.DataFrame) except TypeError: raise TypeError(f"{type(self.kg.train_set)}") - assert isinstance(self.kg.valid_set, polars.DataFrame) or self.kg.valid_set is None - assert isinstance(self.kg.test_set, polars.DataFrame) or self.kg.test_set is None + assert isinstance(self.kg.valid_set, pl.DataFrame) or self.kg.valid_set is None + assert isinstance(self.kg.test_set, pl.DataFrame) or self.kg.test_set is None def concat_splits(train, val, test): x = [train] @@ -138,75 +137,39 @@ def concat_splits(train, val, test): x.append(val) if test is not None: x.append(test) - return polars.concat(x) + return pl.concat(x) - print('Concat Splits...', end=' ') + print('Concat Splits...') df_str_kg = concat_splits(self.kg.train_set, self.kg.valid_set, self.kg.test_set) - @timeit - def entity_index(): - """ Create a mapping from str representation of entities/nodes to integers""" - # Entity Index: {'a':1, 'b':2} : - return polars.concat((df_str_kg['subject'], df_str_kg['object'])).unique(maintain_order=True).rename( - 'entity') - - print('Entity Indexing...', end=' ') - self.kg.entity_to_idx = entity_index() - - @timeit - def relation_index(): - """ Create a mapping from str representation of relations/edges to integers""" - # Relation Index: {'r1':1, 'r2:'2} - return df_str_kg['relation'].unique(maintain_order=True) - - print('Relation Indexing...', end=' ') - self.kg.relation_to_idx = relation_index() - # On YAGO3-10 # 2.90427 and 0.00065 MB, - # print(f'Est. size of entity_to_idx in Polars:{self.kg.entity_to_idx.estimated_size(unit="mb"):.5f} in MB') - # print(f'Est. of relation_to_idx in Polars:{self.kg.relation_to_idx.estimated_size(unit="mb"):.5f} in MB') - self.kg.entity_to_idx = dict(zip(self.kg.entity_to_idx.to_list(), list(range(len(self.kg.entity_to_idx))))) - self.kg.relation_to_idx = dict( - zip(self.kg.relation_to_idx.to_list(), list(range(len(self.kg.relation_to_idx))))) - # On YAGO3-10, 5.24297 in MB and 0.00118 in MB - # print(f'Estimated size of entity_to_idx in Python dict:{sys.getsizeof(self.kg.entity_to_idx) / 1000000 :.5f} in MB') - # print(f'Estimated size of relation_to_idx in Python dict:{sys.getsizeof(self.kg.relation_to_idx) / 1000000 :.5f} in MB') + print('Entity Indexing...') + self.kg.entity_to_idx = pl.concat((df_str_kg['subject'], + df_str_kg['object'])).unique(maintain_order=True).rename('entity') + print('Relation Indexing...') + self.kg.relation_to_idx =df_str_kg['relation'].unique(maintain_order=True) + print('Creating index for entities...') + self.kg.entity_to_idx = {ent: idx for idx, ent in enumerate(self.kg.entity_to_idx.to_list())} + print('Creating index for relations...') + self.kg.relation_to_idx = {rel: idx for idx, rel in enumerate(self.kg.relation_to_idx.to_list())} self.kg.num_entities, self.kg.num_relations = len(self.kg.entity_to_idx), len(self.kg.relation_to_idx) - # using this requires more time and more mem. Maybe there is a bug somewhere in polars. - def indexer(data): - """ Apply str to int mapping on an input data""" - # These column assignments are executed in parallel - # with_colums allows you to create new columns for you analyses. - # https://pola-rs.github.io/polars-book/user-guide/quickstart/quick-exploration-guide.html#with_columns - return data.with_columns([polars.col("subject").apply(lambda x: self.kg.entity_to_idx[x]), - polars.col("relation").apply(lambda x: self.kg.relation_to_idx[x]), - polars.col("object").apply(lambda x: self.kg.entity_to_idx[x])]) - - @timeit - def from_pandas_to_numpy(df): - # Index pandas dataframe? - print(f'Convering data to Pandas {df.shape}...') - df = df.to_pandas() - # Index pandas dataframe? - print(f'Indexing Training Data {df.shape}...') - return index_triples_with_pandas(df, entity_to_idx=self.kg.entity_to_idx, - relation_to_idx=self.kg.relation_to_idx).to_numpy() - print(f'Indexing Training Data {self.kg.train_set.shape}...') - self.kg.train_set = from_pandas_to_numpy(self.kg.train_set) - # self.kg.train_set = from_polars_to_numpy() - # we may try vaex to speed up pandas - # https://stackoverflow.com/questions/69971992/vaex-apply-does-not-work-when-using-dataframe-columns - - print(f'Estimated size of train_set in Numpy: {self.kg.train_set.nbytes / 1_000_000_000 :.5f} in GB') + self.kg.train_set = self.kg.train_set.with_columns( + pl.col("subject").map_dict(self.kg.entity_to_idx).alias("subject"), + pl.col("relation").map_dict(self.kg.relation_to_idx).alias("relation"), + pl.col("object").map_dict(self.kg.entity_to_idx).alias("object")).to_numpy() if self.kg.valid_set is not None: print(f'Indexing Val Data {self.kg.valid_set.shape}...') - self.kg.valid_set = from_pandas_to_numpy(self.kg.valid_set) - print(f'Estimated size of valid_set in Numpy: {self.kg.valid_set.nbytes / 1_000_000_000:.5f} in GB') + self.kg.valid_set = self.kg.valid_set.with_columns( + pl.col("subject").map_dict(self.kg.entity_to_idx).alias("subject"), + pl.col("relation").map_dict(self.kg.relation_to_idx).alias("relation"), + pl.col("object").map_dict(self.kg.entity_to_idx).alias("object")).to_numpy() if self.kg.test_set is not None: print(f'Indexing Test Data {self.kg.test_set.shape}...') - self.kg.test_set = from_pandas_to_numpy(self.kg.test_set) - print(f'Estimated size of test_set in Numpy: {self.kg.test_set.nbytes / 1_000_000_000:.5f} in GB') + self.kg.test_set = self.kg.test_set.with_columns( + pl.col("subject").map_dict(self.kg.entity_to_idx).alias("subject"), + pl.col("relation").map_dict(self.kg.relation_to_idx).alias("relation"), + pl.col("object").map_dict(self.kg.entity_to_idx).alias("object")).to_numpy() print(f'*** Preprocessing Train Data:{self.kg.train_set.shape} with Polars DONE ***') def sequential_vocabulary_construction(self) -> None: diff --git a/dicee/read_preprocess_save_load_kg/read_from_disk.py b/dicee/read_preprocess_save_load_kg/read_from_disk.py index 91def7b9..3f222cd4 100644 --- a/dicee/read_preprocess_save_load_kg/read_from_disk.py +++ b/dicee/read_preprocess_save_load_kg/read_from_disk.py @@ -1,5 +1,8 @@ -from .util import read_from_disk +from .util import read_from_disk,read_from_triple_store import glob +import pandas as pd +import numpy as np + class ReadFromDisk: """Read the data from disk into memory""" @@ -20,19 +23,48 @@ def start(self) -> None: ------- None """ - if self.kg.absolute_path_dataset is not None: - self.kg.train_set = read_from_disk(self.kg.absolute_path_dataset, self.kg.read_only_few, self.kg.sample_triples_ratio, + if self.kg.path_single_kg is not None: + self.kg.train_set = read_from_disk(self.kg.path_single_kg, + self.kg.read_only_few, + self.kg.sample_triples_ratio, backend=self.kg.backend) self.kg.valid_set = None - self.kg.test_set=None + self.kg.test_set = None + elif self.kg.sparql_endpoint: + self.kg.train_set=read_from_triple_store(endpoint=self.kg.sparql_endpoint) + self.kg.valid_set = None + self.kg.test_set = None + else: for i in glob.glob(self.kg.data_dir + '/*'): if 'train' in i: self.kg.train_set = read_from_disk(i, self.kg.read_only_few, self.kg.sample_triples_ratio, backend=self.kg.backend) + if self.kg.add_noise_rate: + self.add_noisy_triples() + elif 'test' in i and self.kg.eval_model is not None: self.kg.test_set = read_from_disk(i, backend=self.kg.backend) elif 'valid' in i and self.kg.eval_model is not None: self.kg.valid_set = read_from_disk(i, backend=self.kg.backend) else: print(f'Unrecognized data {i}') + + def add_noisy_triples(self): + num_noisy_triples = int(len(self.kg.train_set) * self.kg.add_noise_rate) + s = len(self.kg.train_set) + # @TODO: Can we use polars here ? + list_of_entities = pd.unique(self.kg.train_set[['subject', 'object']].values.ravel('K')) + self.kg.train_set = pd.concat([self.kg.train_set, + # Noisy triples + pd.DataFrame( + {'subject': np.random.choice(list_of_entities, num_noisy_triples), + 'relation': np.random.choice( + pd.unique(self.kg.train_set[['relation']].values.ravel('K')), + num_noisy_triples), + 'object': np.random.choice(list_of_entities, num_noisy_triples)} + ) + ], ignore_index=True) + + assert s + num_noisy_triples == len(self.kg.train_set) + diff --git a/dicee/read_preprocess_save_load_kg/util.py b/dicee/read_preprocess_save_load_kg/util.py index 488e8efd..a387c23a 100644 --- a/dicee/read_preprocess_save_load_kg/util.py +++ b/dicee/read_preprocess_save_load_kg/util.py @@ -8,6 +8,8 @@ import pickle import os import psutil +import requests +from rdflib import Graph def timeit(func): @@ -18,10 +20,9 @@ def timeit_wrapper(*args, **kwargs): end_time = time.perf_counter() total_time = end_time - start_time print( - f'Took {total_time:.4f} seconds' - f'|Current Memory Usage {psutil.Process(os.getpid()).memory_info().rss / 1000000: .5} in MB') + f'{func.__name__} took {total_time:.4f} seconds ' + f'| Current Memory Usage {psutil.Process(os.getpid()).memory_info().rss / 1000000: .5} in MB') return result - return timeit_wrapper @@ -29,7 +30,7 @@ def timeit_wrapper(*args, **kwargs): def read_with_polars(data_path, read_only_few: int = None, sample_triples_ratio: float = None) -> polars.DataFrame: """ Load and Preprocess via Polars """ print(f'*** Reading {data_path} with Polars ***') - # (1) Load the data + # (1) Load the data. if data_path[-3:] in ['txt', 'csv']: print('Reading with polars.read_csv with sep **t** ...') df = polars.read_csv(data_path, @@ -45,9 +46,7 @@ def read_with_polars(data_path, read_only_few: int = None, sample_triples_ratio: df = polars.read_parquet(data_path, use_pyarrow=True) else: df = polars.read_parquet(data_path, n_rows=read_only_few) - - print(f'Estimated size of the Polars Dataframe: {df.estimated_size() / 1000000} in MB') - # (2) Sample from (1) + # (2) Sample from (1). if sample_triples_ratio: print(f'Subsampling {sample_triples_ratio} of input data {df.shape}...') df = df.sample(frac=sample_triples_ratio) @@ -96,11 +95,15 @@ def read_with_pandas(data_path, read_only_few: int = None, sample_triples_ratio: return df -def read_from_disk(data_path, read_only_few: int = None, +def read_from_disk(data_path: str, read_only_few: int = None, sample_triples_ratio: float = None, backend=None): assert backend # If path exits if glob.glob(data_path): + if data_path[data_path.find(".") + 1:] in ["owl", "nt", "turtle", "rdf/xml", "n3", " n-triples"]: + return pd.DataFrame(data=[(s, p, o) for s, p, o in Graph().parse(data_path)], + columns=['subject', 'relation', 'object'], dtype=str) + if backend == 'pandas': return read_with_pandas(data_path, read_only_few, sample_triples_ratio) elif backend == 'polars': @@ -112,6 +115,19 @@ def read_from_disk(data_path, read_only_few: int = None, return None +def read_from_triple_store(endpoint: str = None): + """ Read triples from triple store into pandas dataframe """ + assert endpoint is not None + assert isinstance(endpoint, str) + query = """SELECT ?subject ?predicate ?object WHERE { ?subject ?predicate ?object}""" + response = requests.post(endpoint, data={'query': query}) + assert response.ok + # Generator + triples = ([triple['subject']['value'], triple['predicate']['value'], triple['object']['value']] for triple in + response.json()['results']['bindings']) + return pd.DataFrame(data=triples, index=None, columns=["subject", "relation", "object"], dtype=str) + + def get_er_vocab(data, file_path: str = None): # head entity and relation er_vocab = defaultdict(list) diff --git a/dicee/sanity_checkers.py b/dicee/sanity_checkers.py index 47dfda4f..3e7c03ec 100644 --- a/dicee/sanity_checkers.py +++ b/dicee/sanity_checkers.py @@ -1,5 +1,17 @@ import os import glob +import requests + + +def is_sparql_endpoint_alive(sparql_endpoint: str = None): + if sparql_endpoint: + query = """SELECT (COUNT(*) as ?num_triples) WHERE { ?s ?p ?o .} """ + response = requests.post(sparql_endpoint, data={'query': query}) + assert response.ok + print('SPARQL connection is successful') + return response.ok + else: + return False def sanity_checking_with_arguments(args): @@ -20,11 +32,24 @@ def sanity_checking_with_arguments(args): except AssertionError: print(f'num_folds_for_cv can not be negative. Currently:{args.num_folds_for_cv}') raise - # Check whether is a directory or a file? - if args.absolute_path_dataset is not None: - assert args.path_dataset_folder is None - else: - assert isinstance(args.path_dataset_folder,str) + # (1) Check + if is_sparql_endpoint_alive(args.sparql_endpoint): + try: + assert args.path_dataset_folder is None and args.path_single_kg is None + except AssertionError: + raise RuntimeWarning(f'The path_dataset_folder and path_single_kg arguments ' + f'must be None if sparql_endpoint is given.' + f'***{args.path_dataset_folder}***\n' + f'***{args.path_single_kg}***\n' + f'These two parameters are set to None.') + args.path_dataset_folder = None + args.path_single_kg = None + elif args.path_dataset_folder is not None: + try: + assert isinstance(args.path_dataset_folder, str) + except AssertionError: + raise AssertionError(f'The path_dataset_folder must be string sparql_endpoint is not given.' + f'***{args.path_dataset_folder}***') try: assert os.path.isdir(args.path_dataset_folder) or os.path.isfile(args.path_dataset_folder) except AssertionError: @@ -32,6 +57,7 @@ def sanity_checking_with_arguments(args): f'***{args.path_dataset_folder}***') # Check whether the input parameter leads a standard data format (e.g. FOLDER/train.txt) # or a data in the parquet format + # @TODO: Rethink about this computation. if '.parquet' == args.path_dataset_folder[-8:]: """ all is good we have xxx.parquet data""" elif glob.glob(args.path_dataset_folder + '/train*'): @@ -41,6 +67,16 @@ def sanity_checking_with_arguments(args): f'Data format is not recognized.' f'\nThe path_dataset_folder parameter **{args.path_dataset_folder}** must lead to' f'(a) **folder/train.txt** or *** triples stored in the parquet format') + elif args.path_single_kg is not None: + assert args.path_dataset_folder is None + elif args.path_dataset_folder is None and args.path_single_kg is None and args.sparql_endpoint is None: + raise RuntimeError(f" Following arguments cannot be all None:" + f"path_dataset_folder:{args.path_dataset_folder},\t" + f"path_single_kg:{args.path_single_kg},\t" + f"sparql_endpoint:{args.sparql_endpoint}.") + else: + raise RuntimeError('Invalid computation flow!') + def config_kge_sanity_checking(args, dataset): """ diff --git a/dicee/static_funcs.py b/dicee/static_funcs.py index bc09ad52..f340ad76 100644 --- a/dicee/static_funcs.py +++ b/dicee/static_funcs.py @@ -14,9 +14,6 @@ import psutil from .models.base_model import BaseKGE import pickle -from pykeen.datasets.base import EagerDataset -from pykeen.triples.triples_factory import TriplesFactory - def timeit(func): @functools.wraps(func) @@ -26,8 +23,8 @@ def timeit_wrapper(*args, **kwargs): end_time = time.perf_counter() total_time = end_time - start_time print( - f'Took {total_time:.4f} seconds' - f'|Current Memory Usage {psutil.Process(os.getpid()).memory_info().rss / 1000000: .5} in MB') + f'Took {total_time:.4f} secs ' + f'| Current Memory Usage {psutil.Process(os.getpid()).memory_info().rss / 1000000: .5} in MB') return result return timeit_wrapper @@ -43,7 +40,7 @@ def load_pickle(file_path=str): # @TODO: Could these funcs can be merged? -def select_model(args: dict, is_continual_training: bool = None, storage_path: str = None, dataset=None): +def select_model(args: dict, is_continual_training: bool = None, storage_path: str = None): isinstance(args, dict) assert len(args) > 0 assert isinstance(is_continual_training, bool) @@ -61,7 +58,7 @@ def select_model(args: dict, is_continual_training: bool = None, storage_path: s print(f"{storage_path}/model.pt is not found. The model will be trained with random weights") return model, _ else: - return intialize_model(args, dataset) + return intialize_model(args) def load_model(path_of_experiment_folder, model_name='model.pt') -> Tuple[object, dict, dict]: @@ -197,13 +194,11 @@ def save_checkpoint_model(model, path: str) -> None: print(model.name) print('Could not save the model correctly') else: - # Pykeen torch.save(model.model.state_dict(), path) def store(trainer, - trained_model, model_name: str = 'model', full_storage_path: str = None, - dataset=None, save_embeddings_as_csv=False) -> None: + trained_model, model_name: str = 'model', full_storage_path: str = None, save_embeddings_as_csv=False) -> None: """ Store trained_model model and save embeddings into csv file. :param trainer: an instance of trainer class @@ -274,7 +269,9 @@ def read_or_load_kg(args, cls): print('*** Read or Load Knowledge Graph ***') start_time = time.time() kg = cls(data_dir=args.path_dataset_folder, - absolute_path_dataset=args.absolute_path_dataset, + add_noise_rate=args.add_noise_rate, + sparql_endpoint=args.sparql_endpoint, + path_single_kg=args.path_single_kg, add_reciprical=args.apply_reciprical_or_noise, eval_model=args.eval_model, read_only_few=args.read_only_few, @@ -288,29 +285,12 @@ def read_or_load_kg(args, cls): return kg -def get_pykeen_model(model_name: str, args, dataset): - if dataset is None: - # (1) Load a pretrained Pykeen Model - return PykeenKGE(dataset=EagerDataset( - training=TriplesFactory( - load_numpy(args['full_storage_path'] + '/train_set.npy'), - load_pickle(file_path=args['full_storage_path'] + '/entity_to_idx.p'), - load_pickle(file_path=args['full_storage_path'] + '/relation_to_idx.p')), - testing=None), args=args) - elif args['scoring_technique'] in ['KvsAll', "NegSample"]: - return PykeenKGE(dataset=EagerDataset( - training=TriplesFactory(dataset.train_set, dataset.entity_to_idx, dataset.relation_to_idx), - testing=None), args=args) - else: - raise NotImplementedError("Incorrect scoring technique") - - -def intialize_model(args: dict, dataset=None) -> Tuple[object, str]: +def intialize_model(args: dict) -> Tuple[object, str]: # @TODO: Apply construct_krone as callback? or use KronE_QMult as a prefix. # @TODO: Remove form_of_labelling model_name = args['model'] if "pykeen" in model_name.lower(): - model = get_pykeen_model(model_name, args, dataset) + model = PykeenKGE(args=args) form_of_labelling = "EntityPrediction" elif model_name == 'Shallom': model = Shallom(args=args) @@ -361,7 +341,7 @@ def intialize_model(args: dict, dataset=None) -> Tuple[object, str]: model = CMult(args=args) form_of_labelling = 'EntityPrediction' else: - raise ValueError + raise ValueError(f"--model_name: {model_name} is not found.") return model, form_of_labelling @@ -501,9 +481,9 @@ def continual_training_setup_executor(executor) -> None: executor.storage_path = executor.args.full_storage_path else: # Create a single directory containing KGE and all related data - if executor.args.absolute_path_to_store: - os.makedirs(executor.args.absolute_path_to_store, exist_ok=False) - executor.args.full_storage_path=executor.args.absolute_path_to_store + if executor.args.path_to_store_single_run: + os.makedirs(executor.args.path_to_store_single_run, exist_ok=False) + executor.args.full_storage_path=executor.args.path_to_store_single_run else: # Create a parent and subdirectory. executor.args.full_storage_path = create_experiment_folder(folder_name=executor.args.storage_path) diff --git a/dicee/static_funcs_training.py b/dicee/static_funcs_training.py index 86b86349..83bcbfbb 100644 --- a/dicee/static_funcs_training.py +++ b/dicee/static_funcs_training.py @@ -1,3 +1,114 @@ +import torch +from typing import Dict,Tuple,List +import numpy as np + + +def evaluate_lp(model, triple_idx, num_entities, er_vocab:Dict[Tuple,List], re_vocab:Dict[Tuple,List], info='Eval Starts'): + """ + Evaluate model in a standard link prediction task + + for each triple + the rank is computed by taking the mean of the filtered missing head entity rank and + the filtered missing tail entity rank + :param model: + :param triple_idx: + :param info: + :return: + """ + model.eval() + print(info) + print(f'Num of triples {len(triple_idx)}') + print('** Evaluation without batching') + hits = dict() + reciprocal_ranks = [] + # Iterate over test triples + all_entities = torch.arange(0, num_entities).long() + all_entities = all_entities.reshape(len(all_entities), ) + # Iterating one by one is not good when you are using batch norm + for i in range(0, len(triple_idx)): + # (1) Get a triple (head entity, relation, tail entity + data_point = triple_idx[i] + h, r, t = data_point[0], data_point[1], data_point[2] + + # (2) Predict missing heads and tails + x = torch.stack((torch.tensor(h).repeat(num_entities, ), + torch.tensor(r).repeat(num_entities, ), + all_entities), dim=1) + + predictions_tails = model.forward_triples(x) + x = torch.stack((all_entities, + torch.tensor(r).repeat(num_entities, ), + torch.tensor(t).repeat(num_entities) + ), dim=1) + + predictions_heads = model.forward_triples(x) + del x + + # 3. Computed filtered ranks for missing tail entities. + # 3.1. Compute filtered tail entity rankings + filt_tails = er_vocab[(h, r)] + # 3.2 Get the predicted target's score + target_value = predictions_tails[t].item() + # 3.3 Filter scores of all triples containing filtered tail entities + predictions_tails[filt_tails] = -np.Inf + # 3.4 Reset the target's score + predictions_tails[t] = target_value + # 3.5. Sort the score + _, sort_idxs = torch.sort(predictions_tails, descending=True) + sort_idxs = sort_idxs.detach() + filt_tail_entity_rank = np.where(sort_idxs == t)[0][0] + + # 4. Computed filtered ranks for missing head entities. + # 4.1. Retrieve head entities to be filtered + filt_heads = re_vocab[(r, t)] + # 4.2 Get the predicted target's score + target_value = predictions_heads[h].item() + # 4.3 Filter scores of all triples containing filtered head entities. + predictions_heads[filt_heads] = -np.Inf + predictions_heads[h] = target_value + _, sort_idxs = torch.sort(predictions_heads, descending=True) + sort_idxs = sort_idxs.detach() + filt_head_entity_rank = np.where(sort_idxs == h)[0][0] + + # 4. Add 1 to ranks as numpy array first item has the index of 0. + filt_head_entity_rank += 1 + filt_tail_entity_rank += 1 + + rr = 1.0 / filt_head_entity_rank + (1.0 / filt_tail_entity_rank) + # 5. Store reciprocal ranks. + reciprocal_ranks.append(rr) + # print(f'{i}.th triple: mean reciprical rank:{rr}') + + # 4. Compute Hit@N + for hits_level in range(1, 11): + res = 1 if filt_head_entity_rank <= hits_level else 0 + res += 1 if filt_tail_entity_rank <= hits_level else 0 + if res > 0: + hits.setdefault(hits_level, []).append(res) + + mean_reciprocal_rank = sum(reciprocal_ranks) / (float(len(triple_idx) * 2)) + + if 1 in hits: + hit_1 = sum(hits[1]) / (float(len(triple_idx) * 2)) + else: + hit_1 = 0 + + if 3 in hits: + hit_3 = sum(hits[3]) / (float(len(triple_idx) * 2)) + else: + hit_3 = 0 + + if 10 in hits: + hit_10 = sum(hits[10]) / (float(len(triple_idx) * 2)) + else: + hit_10 = 0 + + results = {'H@1': hit_1, 'H@3': hit_3, 'H@10': hit_10, + 'MRR': mean_reciprocal_rank} + print(results) + return results + + def efficient_zero_grad(model): # Use this instead of # self.optimizer.zero_grad() @@ -5,3 +116,4 @@ def efficient_zero_grad(model): # https://pytorch.org/tutorials/recipes/recipes/tuning_guide.html#use-parameter-grad-none-instead-of-model-zero-grad-or-optimizer-zero-grad for param in model.parameters(): param.grad = None + diff --git a/dicee/static_preprocess_funcs.py b/dicee/static_preprocess_funcs.py index 2c09a929..4da54a46 100644 --- a/dicee/static_preprocess_funcs.py +++ b/dicee/static_preprocess_funcs.py @@ -44,12 +44,13 @@ def preprocesses_input_args(args): assert args.init_param in ['xavier_normal', None] - # Below part will be investigated - #args.check_val_every_n_epoch = 10 ** 6 # ,i.e., no eval + # No need to eval. Investigate runtime performance + args.check_val_every_n_epoch = 10 ** 6 # ,i.e., no eval + assert args.add_noise_rate is None or isinstance(args.add_noise_rate, float) args.logger = False try: assert args.eval_model in [None, 'None', 'train', 'val', 'test', 'train_val', 'train_test', 'val_test', - 'train_val_test'] + 'train_val_test'] except AssertionError: raise AssertionError(f'Unexpected input for eval_model ***\t{args.eval_model}\t***') @@ -105,7 +106,8 @@ def create_constraints(triples: np.ndarray) -> Tuple[dict, dict, dict, dict]: set_of_entities.add(e1) set_of_relations.add(p) set_of_entities.add(e2) - print(f'Creating constraints based on {len(set_of_relations)} relations and {len(set_of_entities)} entities...', end='\t') + print(f'Creating constraints based on {len(set_of_relations)} relations and {len(set_of_entities)} entities...', + end='\t') for rel in set_of_relations: range_constraints_per_rel[rel] = list(set_of_entities - range_per_rel[rel]) domain_constraints_per_rel[rel] = list(set_of_entities - domain_per_rel[rel]) diff --git a/dicee/trainer/dice_trainer.py b/dicee/trainer/dice_trainer.py index 78c45cdd..d2367fec 100644 --- a/dicee/trainer/dice_trainer.py +++ b/dicee/trainer/dice_trainer.py @@ -3,7 +3,7 @@ from typing import Union from dicee.models.base_model import BaseKGE from dicee.static_funcs import select_model -from dicee.callbacks import PPE, FPPE, Eval, KronE, PrintCallback, KGESaveCallback, AccumulateEpochLossCallback, GN +from dicee.callbacks import PPE, FPPE, Eval, KronE, PrintCallback, KGESaveCallback, AccumulateEpochLossCallback, GN, RN from dicee.dataset_classes import construct_dataset, reload_dataset from .torch_trainer import TorchTrainer from .torch_trainer_ddp import TorchDDPTrainer @@ -31,7 +31,6 @@ def initialize_trainer(args, callbacks): return TorchTrainer(args, callbacks=callbacks) elif args.trainer == 'PL': print('Initializing Pytorch-lightning Trainer', end='\t') - # Pytest with PL problem https://github.com/pytest-dev/pytest/discussions/7995 return pl.Trainer.from_argparse_args(args, callbacks=callbacks, strategy=DDPStrategy(find_unused_parameters=False)) @@ -52,6 +51,8 @@ def get_callbacks(args): for k, v in args.callbacks.items(): if k == "GN": callbacks.append(GN(std=v['std'], epoch_ratio=v.get('epoch_ratio'))) + elif k=='RN': + callbacks.append(RN(std=v['std'], epoch_ratio=v.get('epoch_ratio'))) elif k == 'FPP': callbacks.append( FPPE(num_epochs=args.num_epochs, path=args.full_storage_path, @@ -91,7 +92,7 @@ class DICE_Trainer: report:dict """ - def __init__(self, args, is_continual_training, storage_path, evaluator=None, dataset=None): + def __init__(self, args, is_continual_training, storage_path, evaluator=None): self.report = dict() self.args = args self.trainer = None @@ -100,7 +101,6 @@ def __init__(self, args, is_continual_training, storage_path, evaluator=None, da # Required for CV. self.evaluator = evaluator self.form_of_labelling = None - self.dataset = dataset print( f'# of CPUs:{os.cpu_count()} | # of GPUs:{torch.cuda.device_count()} | # of CPUs for dataloader:{self.args.num_core}') @@ -143,8 +143,7 @@ def initialize_trainer(self, callbacks: List, plugins: List) -> pl.Trainer: @timeit def initialize_or_load_model(self): print('Initializing Model...', end='\t') - model, form_of_labelling = select_model(vars(self.args), self.is_continual_training, self.storage_path, - self.dataset) + model, form_of_labelling = select_model(vars(self.args), self.is_continual_training, self.storage_path) self.report['form_of_labelling'] = form_of_labelling assert form_of_labelling in ['EntityPrediction', 'RelationPrediction'] return model, form_of_labelling diff --git a/dicee/trainer/torch_trainer.py b/dicee/trainer/torch_trainer.py index 39e23b4e..1eaba6f1 100644 --- a/dicee/trainer/torch_trainer.py +++ b/dicee/trainer/torch_trainer.py @@ -25,10 +25,10 @@ def __init__(self, args, callbacks): self.loss_function = None self.optimizer = None self.model = None - self.is_global_zero = True self.train_dataloaders = None - torch.manual_seed(self.attributes.seed_for_computation) - torch.cuda.manual_seed_all(self.attributes.seed_for_computation) + self.training_step=None + torch.manual_seed(self.attributes.random_seed) + torch.cuda.manual_seed_all(self.attributes.random_seed) if self.attributes.gpus and torch.cuda.is_available(): self.device = torch.device(f'cuda:{self.attributes.gpus}' if torch.cuda.is_available() else 'cpu') else: @@ -57,7 +57,7 @@ def _run_batch(self, i: int, x_batch, y_batch) -> float: # (2) Do not accumulate gradient, zero the gradients per batch. self.optimizer.zero_grad(set_to_none=True) # (3) Loss Forward and Backward w.r.t the batch. - return self.compute_forward_loss_backward(x_batch, y_batch).item() + return self.compute_forward_loss_backward(x_batch, y_batch) def _run_epoch(self, epoch: int) -> float: """ @@ -121,6 +121,7 @@ def fit(self, *args, train_dataloaders, **kwargs) -> None: self.train_dataloaders = train_dataloaders self.loss_function = model.loss_function self.optimizer = self.model.configure_optimizers() + self.training_step=self.model.training_step # (1) Start running callbacks self.on_fit_start(self, self.model) @@ -138,7 +139,7 @@ def fit(self, *args, train_dataloaders, **kwargs) -> None: avg_epoch_loss = self._run_epoch(epoch) print(f"Epoch:{epoch + 1} " f"| Loss:{avg_epoch_loss:.8f} " - f"| Runtime:{(time.time() - start_time) / 60:.3f}mins") + f"| Runtime:{(time.time() - start_time) / 60:.3f} mins") # Autobatch Finder: Double the current batch size if memory allows and repeat this process at mast 5 times. if self.attributes.auto_batch_finder and psutil.virtual_memory().percent < 30.0 and counter < 5: self.train_dataloaders = DataLoader(dataset=self.train_dataloaders.dataset, @@ -176,15 +177,10 @@ def compute_forward_loss_backward(self, x_batch: torch.Tensor, y_batch: torch.Te batch_loss = self.optimizer.step(closure=lambda: self.loss_function(self.model(x_batch), y_batch)) return batch_loss else: - # (1) Forward and Backpropagate the gradient of (3) w.r.t. parameters. - yhat_batch = self.model(x_batch) - # (2) Compute the batch loss - batch_loss = self.loss_function(yhat_batch, y_batch) - # (3) Backward pass + batch_loss=self.training_step(batch=(x_batch, y_batch)) batch_loss.backward() - # (4) Parameter update self.optimizer.step() - return batch_loss + return batch_loss.item() def extract_input_outputs_set_device(self, batch: list) -> Tuple: """ diff --git a/environment.yml b/environment.yml deleted file mode 100644 index f669c9b1..00000000 --- a/environment.yml +++ /dev/null @@ -1,135 +0,0 @@ -name: daikiri -channels: - - defaults -dependencies: - - _libgcc_mutex=0.1=main - - _openmp_mutex=4.5=1_gnu - - ca-certificates=2021.10.26=h06a4308_2 - - certifi=2021.10.8=py39h06a4308_2 - - ld_impl_linux-64=2.35.1=h7274673_9 - - libffi=3.3=he6710b0_2 - - libgcc-ng=9.3.0=h5101ec6_17 - - libgomp=9.3.0=h5101ec6_17 - - libstdcxx-ng=9.3.0=hd4cf53a_17 - - ncurses=6.3=h7f8727e_2 - - openssl=1.1.1m=h7f8727e_0 - - pip=21.2.4=py39h06a4308_0 - - python=3.9.7=h12debd9_1 - - readline=8.1.2=h7f8727e_1 - - sqlite=3.37.0=hc218d9a_0 - - tk=8.6.11=h1ccaba5_0 - - tzdata=2021e=hda174b7_0 - - wheel=0.37.1=pyhd3eb1b0_0 - - xz=5.2.5=h7b6447c_0 - - zlib=1.2.11=h7f8727e_4 - - pip: - - absl-py==1.0.0 - - aiohttp==3.8.1 - - aiosignal==1.2.0 - - analytics-python==1.4.0 - - anyio==3.5.0 - - asdfghjkl==0.1a2 - - asgiref==3.5.0 - - async-timeout==4.0.2 - - attrs==21.4.0 - - backoff==1.10.0 - - backpack-for-pytorch==1.4.0 - - bcrypt==3.2.0 - - bokeh==2.4.2 - - cachetools==5.0.0 - - cffi==1.15.0 - - charset-normalizer==2.0.11 - - click==8.0.3 - - cloudpickle==2.0.0 - - cryptography==36.0.1 - - cycler==0.11.0 - - dask==2022.1.0 - - distributed==2022.1.0 - - einops==0.4.0 - - fastapi==0.73.0 - - ffmpy==0.3.0 - - fonttools==4.29.1 - - frozenlist==1.3.0 - - fsspec==2022.1.0 - - future==0.18.2 - - google-auth==2.6.0 - - google-auth-oauthlib==0.4.6 - - gradio==2.7.5.2 - - grpcio==1.43.0 - - h11==0.13.0 - - heapdict==1.0.1 - - idna==3.3 - - importlib-metadata==4.10.1 - - iniconfig==1.1.1 - - isodate==0.6.1 - - jinja2==3.0.3 - - joblib==1.1.0 - - kiwisolver==1.3.2 - - laplace-torch==0.1a2 - - locket==0.2.1 - - markdown==3.3.6 - - markdown2==2.4.2 - - markupsafe==2.0.1 - - matplotlib==3.5.1 - - monotonic==1.6 - - msgpack==1.0.3 - - multidict==6.0.2 - - numpy==1.22.2 - - oauthlib==3.2.0 - - packaging==21.3 - - pandas==1.4.0 - - paramiko==2.9.2 - - partd==1.2.0 - - pillow==9.0.1 - - pluggy==1.0.0 - - protobuf==3.19.4 - - psutil==5.9.0 - - py==1.11.0 - - pyarrow==6.0.1 - - pyasn1==0.4.8 - - pyasn1-modules==0.2.8 - - pycparser==2.21 - - pycryptodome==3.14.1 - - pydantic==1.9.0 - - pydeprecate==0.3.1 - - pydub==0.25.1 - - pynacl==1.5.0 - - pyparsing==3.0.7 - - pytest==6.2.5 - - python-dateutil==2.8.2 - - python-multipart==0.0.5 - - pytorch-lightning==1.5.9 - - pytz==2021.3 - - pyyaml==6.0 - - rdflib==6.1.1 - - requests==2.27.1 - - requests-oauthlib==1.3.1 - - rsa==4.8 - - scikit-learn==1.0.2 - - scipy==1.7.3 - - setuptools==59.5.0 - - six==1.16.0 - - sniffio==1.2.0 - - sortedcontainers==2.4.0 - - starlette==0.17.1 - - tblib==1.7.0 - - tensorboard==2.8.0 - - tensorboard-data-server==0.6.1 - - tensorboard-plugin-wit==1.8.1 - - threadpoolctl==3.1.0 - - toml==0.10.2 - - toolz==0.11.2 - - torch==1.10.2 - - torchaudio==0.10.2 - - torchmetrics==0.7.1 - - torchvision==0.11.3 - - tornado==6.1 - - tqdm==4.62.3 - - typing-extensions==4.0.1 - - urllib3==1.26.8 - - uvicorn==0.17.4 - - werkzeug==2.0.2 - - yarl==1.7.2 - - zict==2.0.0 - - zipp==3.7.0 -prefix: /home/demir/anaconda3/envs/daikiri diff --git a/examples/script_model_paralelisim.py b/examples/script_model_paralelisim.py new file mode 100644 index 00000000..a0e565a4 --- /dev/null +++ b/examples/script_model_paralelisim.py @@ -0,0 +1,117 @@ +import torch +import dicee +from dicee import DistMult +from dicee import NegSampleDataset +import polars as pl +import time +import pandas as pd +import numpy as np + +if True: + print("Reading KG...") + start_time = time.time() + data = pl.read_parquet("dbpedia-2022-12-nt.parquet.snappy",n_rows=10000) +#data = pl.read_csv("KGs/UMLS/train.txt", +# has_header=False, +# low_memory=False, +# columns=[0, 1, 2], +# dtypes=[pl.Utf8], # str +# new_columns=['subject', 'relation', 'object'], +# separator="\t") + print(f"took {time.time() - start_time}") + print("Unique entities...") + start_time = time.time() + unique_entities = pl.concat((data.get_column('subject'), data.get_column('object'))).unique().rename('entity').to_list() + print(f"took {time.time() - start_time}") + + print("Unique relations...") + start_time = time.time() + unique_relations = data.unique(subset=["relation"]).select("relation").to_series().to_list() + print(f"took {time.time() - start_time}") + + print("Entity index mapping...") + start_time = time.time() + entity_to_idx = {ent: idx for idx, ent in enumerate(unique_entities)} + print(f"took {time.time() - start_time}") + + print("Relation index mapping...") + start_time = time.time() + rel_to_idx = {rel: idx for idx, rel in enumerate(unique_relations)} + print(f"took {time.time() - start_time}") + + print("Constructing training data...") + start_time = time.time() + data = data.with_columns(pl.col("subject").map_dict(entity_to_idx).alias("subject"), + pl.col("relation").map_dict(rel_to_idx).alias("relation"), + pl.col("object").map_dict(entity_to_idx).alias("object")).to_numpy() + print(f"took {time.time() - start_time}") + + num_entities=len(unique_entities) + num_relations=len(unique_relations) + + + + with open("data.npy", 'wb') as f: + np.save(f, data) +else: + with open("data.npy", 'rb') as f: + data = np.load(f) + + num_entities=1+max(max(data[:,0]),max(data[:,2])) + num_relations=1+max(data[:,1]) + +data = NegSampleDataset(train_set=data, + num_entities=num_entities, num_relations=num_relations, + neg_sample_ratio=1.0) +data = torch.utils.data.DataLoader(data, batch_size=1024, shuffle=True) +print("KGE model...") +start_time = time.time() +model1 = DistMult(args={"optim":"SGD","num_entities": num_entities, "num_relations": num_relations,"embedding_dim": 10, 'learning_rate': 0.001}) +print(model1) +model2 = DistMult(args={"optim":"SGD","num_entities": num_entities, "num_relations": num_relations,"embedding_dim": 10, 'learning_rate': 0.001}) +print(model2) + +#model1 = torch.compile(model1) +#model2 = torch.compile(model2) +# TODO: We need to cpu-offlaoding +# Do not sent the eintiere model to GPU but only the batch +#model1.to(device=torch.device("cuda:0"),dtype=torch.float16) +#model2.to(device=torch.device("cuda:1"),dtype=torch.float16) + + +print(f"took {time.time() - start_time}") + +print("Optimizer...") +start_time = time.time() +optim1 = model1.configure_optimizers() +optim2 = model2.configure_optimizers() + +from tqdm import tqdm + + +loss_function = model1.loss_function +print("Training...") +for e in range(1): + epoch_loss = 0 + + for (x, y) in tqdm(data): + # if we have space in GPU, get the next batch + x = x.flatten(start_dim=0, end_dim=1) + y = y.flatten(start_dim=0, end_dim=1) + optim1.zero_grad(set_to_none=True) + optim2.zero_grad(set_to_none=True) + + start_time=time.time() + # CPU + + yhat=(model1.score(*model1.get_triple_representation(x)) + model2.score(*model2.get_triple_representation(x)))/2 + + batch_positive_loss = loss_function(yhat, y) + epoch_loss += batch_positive_loss.item() + batch_positive_loss.backward() + optim1.step() + optim2.step() + + print(epoch_loss / len(data)) + +print('DONE') diff --git a/main.py b/main.py index 14051d66..89b59414 100755 --- a/main.py +++ b/main.py @@ -5,37 +5,45 @@ from dicee.config import ParseDict import argparse + def get_default_arguments(description=None): """ Extends pytorch_lightning Trainer's arguments with ours """ parser = pl.Trainer.add_argparse_args(argparse.ArgumentParser(add_help=False)) # Default Trainer param https://pytorch-lightning.readthedocs.io/en/stable/common/trainer.html#methods # Data related arguments - parser.add_argument("--path_dataset_folder", type=str, default="KGs/UMLS", - help="The path of a folder containing input data") - parser.add_argument("--save_embeddings_as_csv", action="store_false", - help='A flag for saving embeddings in csv file.') + parser.add_argument("--path_dataset_folder", type=str, default=None, + help="The path of a folder containing train.txt, and/or valid.txt and/or test.txt" + ",e.g., KGs/UMLS") + parser.add_argument("--sparql_endpoint", type=str, default=None, + help="An endpoint of a triple store, e.g. 'http://localhost:3030/mutagenesis/'. ") + parser.add_argument("--path_single_kg", type=str, default=None, + help="Path of a file corresponding to the input knowledge graph") + parser.add_argument("--path_to_store_single_run", type=str, default=None, + help="A single directory created that contains related data about embeddings.") parser.add_argument("--storage_path", type=str, default='Experiments', - help="Embeddings, model, and any other related data will be stored therein.") - parser.add_argument("--absolute_path_to_store", type=str, default=None, - help="A directory will be created in a given path,e.g., os.getcwd() + '/Dummy')") - parser.add_argument("--absolute_path_dataset", type=str, default=None, - help="Absolute path of a file corresponding to the input knowledge graph") + help="A directory named with time of execution under --storage_path " + "that contains related data about embeddings.") + parser.add_argument("--save_embeddings_as_csv", action="store_true", + help="A flag for saving embeddings in csv file.") # Model related arguments parser.add_argument("--model", type=str, default="Keci", choices=["ConEx", "AConEx", "ConvQ", "AConvQ", "ConvO", "AConvO", "QMult", "OMult", "Shallom", "DistMult", "TransE", "ComplEx", "Keci", - "Pykeen_QuatE", "Pykeen_MuRE", "Pykeen_BoxE"], + "Pykeen_QuatE", "Pykeen_DistMult", "Pykeen_BoxE", "Pykeen_CP", + "Pykeen_HolE", "Pykeen_ProjE", "Pykeen_RotatE", + "Pykeen_TransE", "Pykeen_TransF", "Pykeen_TransH", + "Pykeen_TransR", "Pykeen_TuckER", "Pykeen_ComplEx"], help="Available knowledge graph embedding models. " "To use other knowledge graph embedding models available in python, e.g.," "**Pykeen_BoxE** and add this into choices") parser.add_argument('--optim', type=str, default='Adam', help='An optimizer', choices=['Adam', 'SGD']) - parser.add_argument('--embedding_dim', type=int, default=32, + parser.add_argument('--embedding_dim', type=int, default=64, help='Number of dimensions for an embedding vector. ') - parser.add_argument("--num_epochs", type=int, default=100, help='Number of epochs for training. ') - parser.add_argument('--batch_size', type=int, default=1024, help='Mini batch size') + parser.add_argument("--num_epochs", type=int, default=10, help='Number of epochs for training. ') + parser.add_argument('--batch_size', type=int, default=256, help='Mini batch size') parser.add_argument("--lr", type=float, default=0.1) parser.add_argument('--callbacks', type=json.loads, default={}, help=' {"PPE":{ "last_percent_to_consider": 10}}, {"GN": {"std":0.1}}') @@ -48,7 +56,7 @@ def get_default_arguments(description=None): parser.add_argument('--scoring_technique', default='KvsAll', help="Training technique for knowledge graph embedding model", choices=["KvsAll", "1vsAll", "NegSample", "KvsSample"]) - parser.add_argument('--neg_ratio', type=int, default=1, + parser.add_argument('--neg_ratio', type=int, default=0, help='The number of negative triples generated per positive triple.') parser.add_argument('--weight_decay', type=float, default=0.0, help='L2 penalty e.g.(0.00001)') parser.add_argument('--input_dropout_rate', type=float, default=0.0) @@ -66,7 +74,7 @@ def get_default_arguments(description=None): help='Number of folds in k-fold cross validation.' 'If >2 ,no evaluation scenario is applied implies no evaluation.') parser.add_argument("--eval_model", type=str, default="train_val_test", - choices=["train", "train_val", "train_val_tset", "test"], + choices=["None", "train", "train_val", "train_val_test", "test"], help='Evaluating link prediction performance on data splits. ') parser.add_argument("--save_model_at_every_epoch", type=int, default=None, help='At every X number of epochs model will be saved. If None, we save 4 times.') @@ -74,13 +82,15 @@ def get_default_arguments(description=None): parser.add_argument("--kernel_size", type=int, default=3, help="Square kernel size for convolution based models.") parser.add_argument("--num_of_output_channels", type=int, default=2, help="# of output channels in convolution") - parser.add_argument("--num_core", type=int, default=4, + parser.add_argument("--num_core", type=int, default=1, help='Number of cores to be used. 0 implies using single CPU') - parser.add_argument("--seed_for_computation", type=int, default=0, + parser.add_argument("--random_seed", type=int, default=0, help='Seed for all, see pl seed_everything().') parser.add_argument("--sample_triples_ratio", type=float, default=None, help='Sample input data.') parser.add_argument("--read_only_few", type=int, default=None, help='READ only first N triples. If 0, read all.') + parser.add_argument("--add_noise_rate", type=float, default=0.0, + help='Add x % of noisy triples into training dataset.') parser.add_argument('--p', type=int, default=0, help='P for Clifford Algebra') parser.add_argument('--q', type=int, default=0, diff --git a/requirements.txt b/requirements.txt index b30214a0..2f11d8c3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,13 @@ -pandas>=1.5.1 torch>=2.0.0 -torch --index-url https://download.pytorch.org/whl/cpu +pandas>=1.5.1 polars>=0.16.14 scikit-learn>=1.2.2 pyarrow>=11.0.0 -pytest>=7.2.2 -gradio>=3.23.0 -psutil>=5.9.4 pytorch-lightning==1.6.4 pykeen==1.10.1 zstandard>=0.21.0 -ruff \ No newline at end of file +pytest>=7.2.2 +psutil>=5.9.4 +ruff>=0.0.284 +gradio>=3.23.0 +rdflib>=7.0.0 diff --git a/setup.py b/setup.py index 340003ec..7eb90e39 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ setup( name='dicee', description='Dice embedding is an hardware-agnostic framework for large-scale knowledge graph embedding applications', - version='0.0.4', + version='0.0.5', packages=find_packages(), install_requires=[ "torch>=2.0.0", @@ -17,7 +17,9 @@ "zstandard>=0.21.0", "pytest>=7.2.2", "psutil>=5.9.4", - "gradio>=3.23.0"], + "ruff>=0.0.284", + "gradio>=3.23.0", + "rdflib>=7.0.0"], author='Caglar Demir', author_email='caglardemir8@gmail.com', url='https://github.com/dice-group/dice-embeddings', diff --git a/tests/test_pykeen.py b/tests/test_pykeen.py index 4edaab3a..835ef0ee 100644 --- a/tests/test_pykeen.py +++ b/tests/test_pykeen.py @@ -16,58 +16,52 @@ def template(model_name): args.num_core = 1 args.scoring_technique = "KvsAll" args.num_epochs = 10 - args.pykeen_model_kwargs = {"embedding_dim": 64} args.sample_triples_ratio = None args.read_only_few = None args.num_folds_for_cv = None return args -# @pytest.mark.parametrize( -# "model_name", -# [ -# "Pykeen_DistMult", -# "Pykeen_ComplEx", -# "Pykeen_QuatE", -# "Pykeen_MuRE", -# # "Pykeen_HolE " -# ], -# ) -# @pytest.mark.filterwarnings("ignore::UserWarning") -# def test_model(model_name): -# args = template(model_name) -# result = Execute(args).start() -# if args.model == "Pykeen_DistMult": -# assert 0.84 >= result["Train"]["MRR"] >= 0.800 -# elif args.model == "Pykeen_ComplEx": -# assert 0.92 >= result["Train"]["MRR"] >= 0.88 -# elif args.model == "Pykeen_QuatE": -# assert 0.999 >= result["Train"]["MRR"] >= 0.94 -# elif args.model == "Pykeen_MuRE": -# assert 0.88 >= result["Train"]["MRR"] >= 0.82 - -@pytest.mark.parametrize("model_name", ["Pykeen_DistMult", - "Pykeen_ComplEx", - "Pykeen_QuatE", - "Pykeen_MuRE", ]) +@pytest.mark.parametrize("model_name", ["Pykeen_DistMult", "Pykeen_ComplEx", "Pykeen_HolE", "Pykeen_CP", + "Pykeen_ProjE", "Pykeen_TuckER", "Pykeen_TransR", "Pykeen_TransH", + "Pykeen_TransD", "Pykeen_TransE", "Pykeen_QuatE", "Pykeen_MuRE", + "Pykeen_BoxE", "Pykeen_RotatE"]) class TestClass: def test_defaultParameters_case(self, model_name): args = template(model_name) result = Execute(args).start() if args.model == "Pykeen_DistMult": - assert 0.84 >= result["Train"]["MRR"] >= 0.800 + assert 0.84 >= result["Train"]["MRR"] >= 0.79 elif args.model == "Pykeen_ComplEx": - assert 0.92 >= result["Train"]["MRR"] >= 0.88 + assert 0.92 >= result["Train"]["MRR"] >= 0.77 elif args.model == "Pykeen_QuatE": - assert 0.999 >= result["Train"]["MRR"] >= 0.94 + assert 0.999 >= result["Train"]["MRR"] >= 0.84 elif args.model == "Pykeen_MuRE": - assert 0.89 >= result["Train"]["MRR"] >= 0.82 + assert 0.89 >= result["Train"]["MRR"] >= 0.85 + elif args.model == "Pykeen_BoxE": + assert 0.85 >= result["Train"]["MRR"] >= 0.78 + elif args.model == "Pykeen_RotatE": + assert 0.67 >= result["Train"]["MRR"] >= 0.64 + elif args.model == "Pykeen_CP": # 1.5M params + assert 1.00 >= result["Train"]["MRR"] >= 0.99 + elif args.model == "Pykeen_HolE": # 14.k params + assert 0.89 >= result["Train"]["MRR"] >= 0.88 + elif args.model == "Pykeen_ProjE": # 14.k params + assert 0.88 >= result["Train"]["MRR"] >= 0.78 + elif args.model == "Pykeen_TuckER": # 276.k params + assert 0.36 >= result["Train"]["MRR"] >= 0.31 + elif args.model == "Pykeen_TransR": # 188.k params + assert 0.72 >= result["Train"]["MRR"] >= 0.66 + elif args.model == "Pykeen_TransF": # 14.5 k params + assert 0.17 >= result["Train"]["MRR"] >= 0.16 + elif args.model == "Pykeen_TransH": # 20.4 k params + assert 0.69 >= result["Train"]["MRR"] >= 0.60 + elif args.model == "Pykeen_TransD": # 29.1 k params + assert 0.73 >= result["Train"]["MRR"] >= 0.60 + elif args.model == "Pykeen_TransE": # 29.1 k params + assert 0.45 >= result["Train"]["MRR"] >= 0.40 def test_GNCallback_case(self, model_name): args = template(model_name) args.callbacks = {'GN': {"std": 0.1}} - Execute(args).start() - - - - + Execute(args).start() \ No newline at end of file