From 8f019d2eefb21c0dea2829767c30d115e390c160 Mon Sep 17 00:00:00 2001 From: Sajid Alam <90610031+SajidAlamQB@users.noreply.github.com> Date: Fri, 24 Jan 2025 13:39:05 +0000 Subject: [PATCH] [SPIKE] Test validate dataset classes in catalog using LSP (#159) * spike lsp Signed-off-by: Sajid Alam * fix diagnostic range Signed-off-by: Sajid Alam * more verbose error messages Signed-off-by: Sajid Alam * scan all files Signed-off-by: Sajid Alam * skip dataset names starting with _ Signed-off-by: Sajid Alam * Update lsp_server.py Signed-off-by: Sajid Alam * check catalog live while writing and add periodic revalidation Signed-off-by: Sajid Alam * only look at catalog files Signed-off-by: Sajid Alam * Update lsp_server.py Signed-off-by: Sajid Alam --------- Signed-off-by: Sajid Alam --- bundled/tool/lsp_server.py | 234 ++++++++++++++++++++++++++++++++++++- src/common/server.ts | 5 + 2 files changed, 237 insertions(+), 2 deletions(-) diff --git a/bundled/tool/lsp_server.py b/bundled/tool/lsp_server.py index 40d9517..0abacdd 100644 --- a/bundled/tool/lsp_server.py +++ b/bundled/tool/lsp_server.py @@ -5,17 +5,21 @@ from __future__ import annotations import glob +import importlib import json import logging import os import pathlib import re import sys +import asyncio from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Dict, Tuple, Optional from common import update_sys_path +from kedro.io import DataCatalog + # ********************************************************** # Update sys.path before importing any bundled libraries. # ********************************************************** @@ -46,6 +50,8 @@ TEXT_DOCUMENT_HOVER, TEXT_DOCUMENT_REFERENCES, WORKSPACE_DID_CHANGE_CONFIGURATION, + TEXT_DOCUMENT_DID_OPEN, + TEXT_DOCUMENT_DID_CHANGE, CompletionItem, CompletionList, CompletionOptions, @@ -59,6 +65,17 @@ Position, Range, TextDocumentPositionParams, + DidOpenTextDocumentParams, + DidChangeTextDocumentParams, + Diagnostic, + DiagnosticSeverity, + DidChangeWatchedFilesParams, + FileChangeType, + RegistrationParams, + Registration, + DidChangeWatchedFilesRegistrationOptions, + FileSystemWatcher, + WatchKind, ) from pygls import uris, workspace from pygls.workspace import TextDocument @@ -182,6 +199,30 @@ async def initialize(params: lsp.InitializeParams) -> None: ) _check_project() + # After initialisation, validate all catalog files + await validate_all_catalogs(LSP_SERVER) + + # Start periodic revalidation + asyncio.create_task(periodic_revalidation(LSP_SERVER)) + + # Set up file watchers for catalog files + catalog_pattern = FileSystemWatcher( + glob_pattern="**/catalog*.y?(a)ml", + kind=(WatchKind.Create | WatchKind.Change | WatchKind.Delete) + ) + await LSP_SERVER.register_capability_async( + RegistrationParams( + registrations=[ + Registration( + id="catalogWatcher", + method="workspace/didChangeWatchedFiles", + register_options=DidChangeWatchedFilesRegistrationOptions( + watchers=[catalog_pattern] + ), + ) + ] + ) + ) ### Kedro LSP logic def _get_conf_paths(server: KedroLanguageServer, key): @@ -481,6 +522,195 @@ def did_change_configuration( """ +@LSP_SERVER.feature(TEXT_DOCUMENT_DID_OPEN) +async def did_open(ls: KedroLanguageServer, params: DidOpenTextDocumentParams): + """Validate catalog content when a file is opened.""" + document_uri = params.text_document.uri + file_path = pathlib.Path(uris.to_fs_path(document_uri)) + + # Only validate files with 'catalog' in the name and YAML extensions + if not (file_path.name.startswith("catalog") and file_path.suffix in {".yml", ".yaml"}): + return + + document = ls.workspace.get_text_document(document_uri) + await validate_catalog_content(ls, document_uri, document.source) + + +@LSP_SERVER.feature(TEXT_DOCUMENT_DID_CHANGE) +async def did_change(ls: KedroLanguageServer, params: DidChangeTextDocumentParams): + """Validate the catalog file live on every change.""" + document_uri = params.text_document.uri + file_path = pathlib.Path(uris.to_fs_path(document_uri)) + + # Only validate files with 'catalog' in the name and YAML extensions + if not (file_path.name.startswith("catalog") and file_path.suffix in {".yml", ".yaml"}): + return + + document = ls.workspace.get_text_document(document_uri) + updated_content = document.source # Live content of the file + await validate_catalog_content(ls, document_uri, updated_content) + + +@LSP_SERVER.feature(lsp.WORKSPACE_DID_CHANGE_WATCHED_FILES) +async def did_change_watched_files(ls: KedroLanguageServer, params: DidChangeWatchedFilesParams): + """Handle changes to catalog files.""" + for change in params.changes: + if change.type in (FileChangeType.Created, FileChangeType.Changed): + await validate_catalog(ls, change.uri) + elif change.type == FileChangeType.Deleted: + # Clear diagnostics for deleted files + ls.publish_diagnostics(change.uri, []) + + +async def validate_all_catalogs(ls: KedroLanguageServer): + """Validate all catalog files in the workspace.""" + _check_project() + if not ls.is_kedro_project(): + return + + catalog_files = find_all_catalog_files(ls.workspace.root_path) + for file_uri in catalog_files: + await validate_catalog(ls, file_uri) + + +def find_all_catalog_files(root_path): + """Find all catalog files in the workspace.""" + catalog_files = [] + for dirpath, _, filenames in os.walk(root_path): + for filename in filenames: + if filename.startswith('catalog') and filename.endswith(('.yml', '.yaml')): + file_path = os.path.join(dirpath, filename) + file_uri = uris.from_fs_path(file_path) + catalog_files.append(file_uri) + return catalog_files + + +def remove_line_numbers(config): + if isinstance(config, dict): + return {k: remove_line_numbers(v) for k, v in config.items() if k != '__line__'} + elif isinstance(config, list): + return [remove_line_numbers(i) for i in config] + else: + return config + + +async def validate_catalog_content(ls: KedroLanguageServer, uri: str, content: str): + """Validate catalog content dynamically.""" + diagnostics = [] + + try: + # Parse the YAML content + catalog_config = yaml.load(content, Loader=SafeLineLoader) + if not isinstance(catalog_config, dict): + return # Invalid catalog format + + # Remove '__line__' keys + clean_catalog_config = remove_line_numbers(catalog_config) + + try: + # Attempt to create a DataCatalog with the cleaned catalog config + DataCatalog.from_config(clean_catalog_config) + except Exception as e: + # Check each dataset individually if the entire catalog fails + for dataset_name, dataset_config in catalog_config.items(): + if dataset_name.startswith("_"): + continue # Skip private datasets + + clean_dataset_config = remove_line_numbers(dataset_config) + + try: + DataCatalog.from_config({dataset_name: clean_dataset_config}) + except Exception as dataset_exception: + # Add diagnostic for invalid dataset + line_info = find_line_number_and_character(content, dataset_name, "type") + if line_info: + line_number, start_char = line_info + dataset_type = dataset_config.get("type", "Unknown") + end_char = start_char + len(f"type: {dataset_type}") + diagnostic = Diagnostic( + range=Range( + start=Position(line=line_number, character=start_char), + end=Position(line=line_number, character=end_char), + ), + message=f"Dataset '{dataset_name}' has an invalid type '{dataset_type}'. {dataset_exception}", + severity=DiagnosticSeverity.Error, + source="Kedro LSP", + ) + diagnostics.append(diagnostic) + except Exception as e: + # Handle YAML parsing errors + log_error(f"Error parsing catalog content: {e}") + diagnostic = Diagnostic( + range=Range( + start=Position(line=0, character=0), + end=Position(line=0, character=0), + ), + message=f"YAML parsing error: {e}", + severity=DiagnosticSeverity.Error, + source="Kedro LSP", + ) + diagnostics.append(diagnostic) + + # Publish diagnostics for the file + ls.publish_diagnostics(uri, diagnostics) + + +async def validate_catalog(ls: KedroLanguageServer, uri: str): + """Validate a catalog file by reading its content from disk.""" + file_path = pathlib.Path(uris.to_fs_path(uri)) + if not file_path.exists(): + # Clear diagnostics if the file does not exist + ls.publish_diagnostics(uri, []) + return + + try: + content = file_path.read_text(encoding='utf-8') + await validate_catalog_content(ls, uri, content) + except Exception as e: + log_error(f"Error reading file {file_path}: {e}") + ls.publish_diagnostics(uri, []) # Clear diagnostics if reading fails + + +async def periodic_revalidation(ls: KedroLanguageServer, interval: int = 5): + """Periodically revalidate all catalog files.""" + while True: + try: + await validate_all_catalogs(ls) + except Exception as e: + log_error(f"Error during periodic revalidation: {e}") + await asyncio.sleep(interval) + + +def is_dataset_importable(dataset_type: str) -> Tuple[bool, Optional[str]]: + try: + module_name, class_name = dataset_type.rsplit('.', 1) + module = importlib.import_module(module_name) + getattr(module, class_name) + return True, None + except ImportError as e: + return False, f"Module '{module_name}' cannot be imported. {e}" + except AttributeError: + return False, f"Class '{class_name}' not found in module '{module_name}'." + except ValueError: + return False, "Invalid dataset type format. It should be 'module.ClassName'." + + +def find_line_number_and_character(text: str, dataset_name: str, field_name: str) -> Optional[Tuple[int, int]]: + lines = text.split('\n') + in_dataset = False + for idx, line in enumerate(lines): + stripped_line = line.strip() + if stripped_line.startswith(f"{dataset_name}:"): + in_dataset = True + elif in_dataset and stripped_line.startswith(f"{field_name}:"): + # Calculate the character position accounting for indentation + start_char = len(line) - len(line.lstrip()) + return idx, start_char + elif stripped_line and not stripped_line.startswith(' '): + in_dataset = False # End of current dataset + return None + + def _get_global_defaults(): return { "path": GLOBAL_SETTINGS.get("path", []), @@ -542,7 +772,7 @@ def log_for_lsp_debug(msg: str): def _is_pipeline(uri): - path = Path(uri) + path = Path(uris.to_fs_path(uri)) filename = path.name if "pipeline" in str(filename): return True diff --git a/src/common/server.ts b/src/common/server.ts index e6f2e5b..b19143f 100644 --- a/src/common/server.ts +++ b/src/common/server.ts @@ -92,6 +92,11 @@ async function createServer( { scheme: 'untitled', language: 'python' }, { scheme: 'vscode-notebook', language: 'python' }, { scheme: 'vscode-notebook-cell', language: 'python' }, + { + scheme: 'file', + language: 'yaml', + pattern: '**/catalog*.yml', + }, ], outputChannel: outputChannel, traceOutputChannel: outputChannel,