Skip to content

Commit

Permalink
[SPIKE] Test validate dataset classes in catalog using LSP (#159)
Browse files Browse the repository at this point in the history
* spike lsp

Signed-off-by: Sajid Alam <[email protected]>

* fix diagnostic range

Signed-off-by: Sajid Alam <[email protected]>

* more verbose error messages

Signed-off-by: Sajid Alam <[email protected]>

* scan all files

Signed-off-by: Sajid Alam <[email protected]>

* skip dataset names starting with _

Signed-off-by: Sajid Alam <[email protected]>

* Update lsp_server.py

Signed-off-by: Sajid Alam <[email protected]>

* check catalog live while writing and add periodic revalidation

Signed-off-by: Sajid Alam <[email protected]>

* only look at catalog files

Signed-off-by: Sajid Alam <[email protected]>

* Update lsp_server.py

Signed-off-by: Sajid Alam <[email protected]>

---------

Signed-off-by: Sajid Alam <[email protected]>
  • Loading branch information
SajidAlamQB authored Jan 24, 2025
1 parent 8f02004 commit 8f019d2
Show file tree
Hide file tree
Showing 2 changed files with 237 additions and 2 deletions.
234 changes: 232 additions & 2 deletions bundled/tool/lsp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@
from __future__ import annotations

import glob
import importlib
import json
import logging
import os
import pathlib
import re
import sys
import asyncio
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any, Dict, Tuple, Optional

from common import update_sys_path

from kedro.io import DataCatalog

# **********************************************************
# Update sys.path before importing any bundled libraries.
# **********************************************************
Expand Down Expand Up @@ -46,6 +50,8 @@
TEXT_DOCUMENT_HOVER,
TEXT_DOCUMENT_REFERENCES,
WORKSPACE_DID_CHANGE_CONFIGURATION,
TEXT_DOCUMENT_DID_OPEN,
TEXT_DOCUMENT_DID_CHANGE,
CompletionItem,
CompletionList,
CompletionOptions,
Expand All @@ -59,6 +65,17 @@
Position,
Range,
TextDocumentPositionParams,
DidOpenTextDocumentParams,
DidChangeTextDocumentParams,
Diagnostic,
DiagnosticSeverity,
DidChangeWatchedFilesParams,
FileChangeType,
RegistrationParams,
Registration,
DidChangeWatchedFilesRegistrationOptions,
FileSystemWatcher,
WatchKind,
)
from pygls import uris, workspace
from pygls.workspace import TextDocument
Expand Down Expand Up @@ -182,6 +199,30 @@ async def initialize(params: lsp.InitializeParams) -> None:
)
_check_project()

# After initialisation, validate all catalog files
await validate_all_catalogs(LSP_SERVER)

# Start periodic revalidation
asyncio.create_task(periodic_revalidation(LSP_SERVER))

# Set up file watchers for catalog files
catalog_pattern = FileSystemWatcher(
glob_pattern="**/catalog*.y?(a)ml",
kind=(WatchKind.Create | WatchKind.Change | WatchKind.Delete)
)
await LSP_SERVER.register_capability_async(
RegistrationParams(
registrations=[
Registration(
id="catalogWatcher",
method="workspace/didChangeWatchedFiles",
register_options=DidChangeWatchedFilesRegistrationOptions(
watchers=[catalog_pattern]
),
)
]
)
)

### Kedro LSP logic
def _get_conf_paths(server: KedroLanguageServer, key):
Expand Down Expand Up @@ -481,6 +522,195 @@ def did_change_configuration(
"""


@LSP_SERVER.feature(TEXT_DOCUMENT_DID_OPEN)
async def did_open(ls: KedroLanguageServer, params: DidOpenTextDocumentParams):
"""Validate catalog content when a file is opened."""
document_uri = params.text_document.uri
file_path = pathlib.Path(uris.to_fs_path(document_uri))

# Only validate files with 'catalog' in the name and YAML extensions
if not (file_path.name.startswith("catalog") and file_path.suffix in {".yml", ".yaml"}):
return

document = ls.workspace.get_text_document(document_uri)
await validate_catalog_content(ls, document_uri, document.source)


@LSP_SERVER.feature(TEXT_DOCUMENT_DID_CHANGE)
async def did_change(ls: KedroLanguageServer, params: DidChangeTextDocumentParams):
"""Validate the catalog file live on every change."""
document_uri = params.text_document.uri
file_path = pathlib.Path(uris.to_fs_path(document_uri))

# Only validate files with 'catalog' in the name and YAML extensions
if not (file_path.name.startswith("catalog") and file_path.suffix in {".yml", ".yaml"}):
return

document = ls.workspace.get_text_document(document_uri)
updated_content = document.source # Live content of the file
await validate_catalog_content(ls, document_uri, updated_content)


@LSP_SERVER.feature(lsp.WORKSPACE_DID_CHANGE_WATCHED_FILES)
async def did_change_watched_files(ls: KedroLanguageServer, params: DidChangeWatchedFilesParams):
"""Handle changes to catalog files."""
for change in params.changes:
if change.type in (FileChangeType.Created, FileChangeType.Changed):
await validate_catalog(ls, change.uri)
elif change.type == FileChangeType.Deleted:
# Clear diagnostics for deleted files
ls.publish_diagnostics(change.uri, [])


async def validate_all_catalogs(ls: KedroLanguageServer):
"""Validate all catalog files in the workspace."""
_check_project()
if not ls.is_kedro_project():
return

catalog_files = find_all_catalog_files(ls.workspace.root_path)
for file_uri in catalog_files:
await validate_catalog(ls, file_uri)


def find_all_catalog_files(root_path):
"""Find all catalog files in the workspace."""
catalog_files = []
for dirpath, _, filenames in os.walk(root_path):
for filename in filenames:
if filename.startswith('catalog') and filename.endswith(('.yml', '.yaml')):
file_path = os.path.join(dirpath, filename)
file_uri = uris.from_fs_path(file_path)
catalog_files.append(file_uri)
return catalog_files


def remove_line_numbers(config):
if isinstance(config, dict):
return {k: remove_line_numbers(v) for k, v in config.items() if k != '__line__'}
elif isinstance(config, list):
return [remove_line_numbers(i) for i in config]
else:
return config


async def validate_catalog_content(ls: KedroLanguageServer, uri: str, content: str):
"""Validate catalog content dynamically."""
diagnostics = []

try:
# Parse the YAML content
catalog_config = yaml.load(content, Loader=SafeLineLoader)
if not isinstance(catalog_config, dict):
return # Invalid catalog format

# Remove '__line__' keys
clean_catalog_config = remove_line_numbers(catalog_config)

try:
# Attempt to create a DataCatalog with the cleaned catalog config
DataCatalog.from_config(clean_catalog_config)
except Exception as e:
# Check each dataset individually if the entire catalog fails
for dataset_name, dataset_config in catalog_config.items():
if dataset_name.startswith("_"):
continue # Skip private datasets

clean_dataset_config = remove_line_numbers(dataset_config)

try:
DataCatalog.from_config({dataset_name: clean_dataset_config})
except Exception as dataset_exception:
# Add diagnostic for invalid dataset
line_info = find_line_number_and_character(content, dataset_name, "type")
if line_info:
line_number, start_char = line_info
dataset_type = dataset_config.get("type", "Unknown")
end_char = start_char + len(f"type: {dataset_type}")
diagnostic = Diagnostic(
range=Range(
start=Position(line=line_number, character=start_char),
end=Position(line=line_number, character=end_char),
),
message=f"Dataset '{dataset_name}' has an invalid type '{dataset_type}'. {dataset_exception}",
severity=DiagnosticSeverity.Error,
source="Kedro LSP",
)
diagnostics.append(diagnostic)
except Exception as e:
# Handle YAML parsing errors
log_error(f"Error parsing catalog content: {e}")
diagnostic = Diagnostic(
range=Range(
start=Position(line=0, character=0),
end=Position(line=0, character=0),
),
message=f"YAML parsing error: {e}",
severity=DiagnosticSeverity.Error,
source="Kedro LSP",
)
diagnostics.append(diagnostic)

# Publish diagnostics for the file
ls.publish_diagnostics(uri, diagnostics)


async def validate_catalog(ls: KedroLanguageServer, uri: str):
"""Validate a catalog file by reading its content from disk."""
file_path = pathlib.Path(uris.to_fs_path(uri))
if not file_path.exists():
# Clear diagnostics if the file does not exist
ls.publish_diagnostics(uri, [])
return

try:
content = file_path.read_text(encoding='utf-8')
await validate_catalog_content(ls, uri, content)
except Exception as e:
log_error(f"Error reading file {file_path}: {e}")
ls.publish_diagnostics(uri, []) # Clear diagnostics if reading fails


async def periodic_revalidation(ls: KedroLanguageServer, interval: int = 5):
"""Periodically revalidate all catalog files."""
while True:
try:
await validate_all_catalogs(ls)
except Exception as e:
log_error(f"Error during periodic revalidation: {e}")
await asyncio.sleep(interval)


def is_dataset_importable(dataset_type: str) -> Tuple[bool, Optional[str]]:
try:
module_name, class_name = dataset_type.rsplit('.', 1)
module = importlib.import_module(module_name)
getattr(module, class_name)
return True, None
except ImportError as e:
return False, f"Module '{module_name}' cannot be imported. {e}"
except AttributeError:
return False, f"Class '{class_name}' not found in module '{module_name}'."
except ValueError:
return False, "Invalid dataset type format. It should be 'module.ClassName'."


def find_line_number_and_character(text: str, dataset_name: str, field_name: str) -> Optional[Tuple[int, int]]:
lines = text.split('\n')
in_dataset = False
for idx, line in enumerate(lines):
stripped_line = line.strip()
if stripped_line.startswith(f"{dataset_name}:"):
in_dataset = True
elif in_dataset and stripped_line.startswith(f"{field_name}:"):
# Calculate the character position accounting for indentation
start_char = len(line) - len(line.lstrip())
return idx, start_char
elif stripped_line and not stripped_line.startswith(' '):
in_dataset = False # End of current dataset
return None


def _get_global_defaults():
return {
"path": GLOBAL_SETTINGS.get("path", []),
Expand Down Expand Up @@ -542,7 +772,7 @@ def log_for_lsp_debug(msg: str):


def _is_pipeline(uri):
path = Path(uri)
path = Path(uris.to_fs_path(uri))
filename = path.name
if "pipeline" in str(filename):
return True
Expand Down
5 changes: 5 additions & 0 deletions src/common/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ async function createServer(
{ scheme: 'untitled', language: 'python' },
{ scheme: 'vscode-notebook', language: 'python' },
{ scheme: 'vscode-notebook-cell', language: 'python' },
{
scheme: 'file',
language: 'yaml',
pattern: '**/catalog*.yml',
},
],
outputChannel: outputChannel,
traceOutputChannel: outputChannel,
Expand Down

0 comments on commit 8f019d2

Please sign in to comment.