diff --git a/metadata-ingestion/tests/unit/mysql/mysql_test_stored_procedures.py b/metadata-ingestion/tests/unit/mysql/mysql_test_stored_procedures.py index fbcdb6d135ea37..58d38a7ed930a3 100644 --- a/metadata-ingestion/tests/unit/mysql/mysql_test_stored_procedures.py +++ b/metadata-ingestion/tests/unit/mysql/mysql_test_stored_procedures.py @@ -3,7 +3,8 @@ from pydantic import SecretStr from datahub.configuration.common import AllowDenyPattern -from datahub.ingestion.source.sql.mysql import MySQLConfig +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.sql.mysql import MySQLConfig, MySQLSource from datahub.ingestion.source.sql.mysql.job_models import ( MySQLDataJob, MySQLProcedureContainer, @@ -126,3 +127,154 @@ def test_procedure_metadata_handling(): "description": "Test procedure", "created_by": "test_user", } + + +def test_mysql_data_job_empty_properties(): + """Test MySQL data job with empty properties""" + procedure = MySQLStoredProcedure( + routine_schema="test_db", + routine_name="test_proc", + flow=MySQLProcedureContainer( + name="test_db.stored_procedures", + env="PROD", + db="test_db", + platform_instance=None, + ), + ) + + data_job = MySQLDataJob(entity=procedure) + + # Test initial empty properties + assert data_job.valued_properties == {} + assert len(data_job.job_properties) == 0 + + # Test empty string property - should be included since it's not None + data_job.add_property("test", "") + assert "test" in data_job.valued_properties + assert data_job.valued_properties["test"] == "" + + # Test string "None" property - should be included since it's not None + data_job.add_property("test2", "None") + assert "test2" in data_job.valued_properties + assert data_job.valued_properties["test2"] == "None" + + +def test_mysql_procedure_platform_instance(): + """Test MySQL procedure with platform instance""" + container = MySQLProcedureContainer( + name="test_db.stored_procedures", + env="PROD", + db="test_db", + platform_instance="custom-instance", + ) + + procedure = MySQLStoredProcedure( + routine_schema="test_db", routine_name="test_proc", flow=container + ) + + data_job = MySQLDataJob(entity=procedure) + platform_instance = data_job.as_maybe_platform_instance_aspect + + assert platform_instance is not None + # Check both platform and instance URNs + assert platform_instance.platform == "urn:li:dataPlatform:mysql" + assert ( + platform_instance.instance + == "urn:li:dataPlatformInstance:(urn:li:dataPlatform:mysql,custom-instance)" + ) + + +def test_mysql_data_job_aspects(): + """Test MySQL data job input/output aspects""" + procedure = MySQLStoredProcedure( + routine_schema="test_db", + routine_name="test_proc", + flow=MySQLProcedureContainer( + name="test_db.stored_procedures", + env="PROD", + db="test_db", + platform_instance=None, + ), + ) + + data_job = MySQLDataJob(entity=procedure) + + # Add some test data + data_job.incoming = ["dataset1", "dataset2"] + data_job.outgoing = ["dataset3"] + data_job.input_jobs = ["job1"] + + io_aspect = data_job.as_datajob_input_output_aspect + assert sorted(io_aspect.inputDatasets) == ["dataset1", "dataset2"] + assert io_aspect.outputDatasets == ["dataset3"] + assert io_aspect.inputDatajobs == ["job1"] + + +def test_mysql_flow_container_formatting(): + """Test MySQL flow container name formatting""" + container = MySQLProcedureContainer( + name="test,db.stored,procedures", # Contains commas + env="PROD", + db="test_db", + platform_instance=None, + ) + + assert container.formatted_name == "test-db.stored-procedures" + + +def test_stored_procedure_properties(): + """Test stored procedure additional properties""" + procedure = MySQLStoredProcedure( + routine_schema="test_db", + routine_name="test_proc", + flow=MySQLProcedureContainer( + name="test_db.stored_procedures", + env="PROD", + db="test_db", + platform_instance=None, + ), + code="CREATE PROCEDURE test_proc() BEGIN SELECT 1; END", + ) + + data_job = MySQLDataJob(entity=procedure) + + # Test adding various properties + data_job.add_property("CREATED", "2024-01-01") + data_job.add_property("LAST_ALTERED", "2024-01-02") + data_job.add_property("SQL_DATA_ACCESS", "MODIFIES") + data_job.add_property("SECURITY_TYPE", "DEFINER") + data_job.add_property("parameters", "IN param1 INT, OUT param2 VARCHAR") + + assert len(data_job.valued_properties) == 5 + assert all(value is not None for value in data_job.valued_properties.values()) + + +def test_temp_table_patterns(): + """Test comprehensive temp table pattern matching""" + config = MySQLConfig( + schema_pattern=AllowDenyPattern(allow=["test_schema"]), + table_pattern=AllowDenyPattern(allow=["test_schema.*"]), + ) + source = MySQLSource(ctx=PipelineContext(run_id="test"), config=config) + + # Mock the discovered_datasets property to include all our "permanent" tables + source.discovered_datasets = { + "test_schema.permanent_table", + "test_schema.regular_table", + "test_schema.table", + } + + test_cases = [ + ("test_schema.#temp", True), # Starts with # + ("test_schema._tmp", True), # Starts with _tmp + ("test_schema.regular_table", False), # In discovered_datasets + ("test_schema.table", False), # In discovered_datasets + ("test_schema.temp_123", True), # Not in discovered_datasets + ("other_schema.temp", False), # Schema not allowed + ] + + for table_name, expected in test_cases: + actual = source.is_temp_table(table_name) + assert actual == expected, ( + f"Failed for {table_name}. Expected {expected}, got {actual}" + ) diff --git a/metadata-ingestion/tests/unit/test_mariadb_source.py b/metadata-ingestion/tests/unit/test_mariadb_source.py index 2045a41efa70e1..31e7e9065c28ad 100644 --- a/metadata-ingestion/tests/unit/test_mariadb_source.py +++ b/metadata-ingestion/tests/unit/test_mariadb_source.py @@ -202,3 +202,86 @@ def test_mariadb_config(): assert not config.procedure_pattern.allowed("test_db.my_proc_temp") assert not config.procedure_pattern.allowed("other_db.proc") assert config.host_port == "localhost:3306" + + +def test_mariadb_error_handling(): + """Test error handling in MariaDB stored procedure fetching""" + mock_conn = MagicMock(spec=Connection) + + # Create mock result for ROUTINES query + routines_result = MagicMock() + routines_result.__iter__.return_value = [ + { + "ROUTINE_SCHEMA": "test_db", + "ROUTINE_NAME": "test_proc", + "ROUTINE_DEFINITION": "CREATE PROCEDURE test_proc() BEGIN SELECT 1; END", + "ROUTINE_COMMENT": "Test procedure", + "CREATED": "2024-01-01", + "LAST_ALTERED": "2024-01-02", + "SQL_DATA_ACCESS": "MODIFIES", + "SECURITY_TYPE": "DEFINER", + "DEFINER": "root@localhost", + } + ].__iter__() + + # Mock execution behavior + def mock_execute(query): + if "SHOW CREATE PROCEDURE" in str(query): + raise Exception("Failed to get procedure") + if "FROM information_schema.ROUTINES" in str(query): + return routines_result + return MagicMock() + + mock_conn.execute.side_effect = mock_execute + + source = MariaDBSource(ctx=PipelineContext(run_id="test"), config=MariaDBConfig()) + procedures = source._get_stored_procedures( + conn=mock_conn, db_name="test_db", schema="test_db" + ) + + # Verify the results + assert len(procedures) == 1 + assert procedures[0]["routine_schema"] == "test_db" + assert procedures[0]["routine_name"] == "test_proc" + # Should fall back to ROUTINE_DEFINITION when SHOW CREATE PROCEDURE fails + assert procedures[0]["code"] == "CREATE PROCEDURE test_proc() BEGIN SELECT 1; END" + assert "code" in procedures[0] + + +def test_mariadb_procedure_pattern_filtering(): + """Test procedure pattern filtering in MariaDB source""" + mock_inspector = MagicMock(spec=Inspector) + mock_engine = MagicMock() + mock_conn = MagicMock(spec=Connection) + mock_cm = MagicMock() + mock_cm.__enter__.return_value = mock_conn + mock_engine.connect.return_value = mock_cm + mock_inspector.engine = mock_engine + mock_inspector.engine.url.database = "test_db" + + config = MariaDBConfig( + host_port="localhost:3306", + include_stored_procedures=True, + procedure_pattern=AllowDenyPattern(allow=["test_db.*"], deny=[".*_temp"]), + ) + + source = MariaDBSource(ctx=PipelineContext(run_id="test"), config=config) + + with patch.object(source, "_get_stored_procedures") as mock_get_procs: + mock_get_procs.return_value = [ + { + "routine_schema": "test_db", + "routine_name": "test_proc_temp", + "code": "CREATE PROCEDURE test_proc_temp() BEGIN SELECT 1; END", + } + ] + + # Convert generator to list to execute it + workunits = list( + source.loop_stored_procedures( + inspector=mock_inspector, schema="test_db", sql_config=config + ) + ) + + # Should be filtered out by pattern + assert len(workunits) == 0