Skip to content

Commit

Permalink
more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny committed Jan 26, 2025
1 parent 5006b57 commit 477a5fa
Show file tree
Hide file tree
Showing 2 changed files with 236 additions and 1 deletion.
154 changes: 153 additions & 1 deletion metadata-ingestion/tests/unit/mysql/mysql_test_stored_procedures.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from pydantic import SecretStr

from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.source.sql.mysql import MySQLConfig
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.sql.mysql import MySQLConfig, MySQLSource
from datahub.ingestion.source.sql.mysql.job_models import (
MySQLDataJob,
MySQLProcedureContainer,
Expand Down Expand Up @@ -126,3 +127,154 @@ def test_procedure_metadata_handling():
"description": "Test procedure",
"created_by": "test_user",
}


def test_mysql_data_job_empty_properties():
"""Test MySQL data job with empty properties"""
procedure = MySQLStoredProcedure(
routine_schema="test_db",
routine_name="test_proc",
flow=MySQLProcedureContainer(
name="test_db.stored_procedures",
env="PROD",
db="test_db",
platform_instance=None,
),
)

data_job = MySQLDataJob(entity=procedure)

# Test initial empty properties
assert data_job.valued_properties == {}
assert len(data_job.job_properties) == 0

# Test empty string property - should be included since it's not None
data_job.add_property("test", "")
assert "test" in data_job.valued_properties
assert data_job.valued_properties["test"] == ""

# Test string "None" property - should be included since it's not None
data_job.add_property("test2", "None")
assert "test2" in data_job.valued_properties
assert data_job.valued_properties["test2"] == "None"


def test_mysql_procedure_platform_instance():
"""Test MySQL procedure with platform instance"""
container = MySQLProcedureContainer(
name="test_db.stored_procedures",
env="PROD",
db="test_db",
platform_instance="custom-instance",
)

procedure = MySQLStoredProcedure(
routine_schema="test_db", routine_name="test_proc", flow=container
)

data_job = MySQLDataJob(entity=procedure)
platform_instance = data_job.as_maybe_platform_instance_aspect

assert platform_instance is not None
# Check both platform and instance URNs
assert platform_instance.platform == "urn:li:dataPlatform:mysql"
assert (
platform_instance.instance
== "urn:li:dataPlatformInstance:(urn:li:dataPlatform:mysql,custom-instance)"
)


def test_mysql_data_job_aspects():
"""Test MySQL data job input/output aspects"""
procedure = MySQLStoredProcedure(
routine_schema="test_db",
routine_name="test_proc",
flow=MySQLProcedureContainer(
name="test_db.stored_procedures",
env="PROD",
db="test_db",
platform_instance=None,
),
)

data_job = MySQLDataJob(entity=procedure)

# Add some test data
data_job.incoming = ["dataset1", "dataset2"]
data_job.outgoing = ["dataset3"]
data_job.input_jobs = ["job1"]

io_aspect = data_job.as_datajob_input_output_aspect
assert sorted(io_aspect.inputDatasets) == ["dataset1", "dataset2"]
assert io_aspect.outputDatasets == ["dataset3"]
assert io_aspect.inputDatajobs == ["job1"]


def test_mysql_flow_container_formatting():
"""Test MySQL flow container name formatting"""
container = MySQLProcedureContainer(
name="test,db.stored,procedures", # Contains commas
env="PROD",
db="test_db",
platform_instance=None,
)

assert container.formatted_name == "test-db.stored-procedures"


def test_stored_procedure_properties():
"""Test stored procedure additional properties"""
procedure = MySQLStoredProcedure(
routine_schema="test_db",
routine_name="test_proc",
flow=MySQLProcedureContainer(
name="test_db.stored_procedures",
env="PROD",
db="test_db",
platform_instance=None,
),
code="CREATE PROCEDURE test_proc() BEGIN SELECT 1; END",
)

data_job = MySQLDataJob(entity=procedure)

# Test adding various properties
data_job.add_property("CREATED", "2024-01-01")
data_job.add_property("LAST_ALTERED", "2024-01-02")
data_job.add_property("SQL_DATA_ACCESS", "MODIFIES")
data_job.add_property("SECURITY_TYPE", "DEFINER")
data_job.add_property("parameters", "IN param1 INT, OUT param2 VARCHAR")

assert len(data_job.valued_properties) == 5
assert all(value is not None for value in data_job.valued_properties.values())


def test_temp_table_patterns():
"""Test comprehensive temp table pattern matching"""
config = MySQLConfig(
schema_pattern=AllowDenyPattern(allow=["test_schema"]),
table_pattern=AllowDenyPattern(allow=["test_schema.*"]),
)
source = MySQLSource(ctx=PipelineContext(run_id="test"), config=config)

# Mock the discovered_datasets property to include all our "permanent" tables
source.discovered_datasets = {
"test_schema.permanent_table",
"test_schema.regular_table",
"test_schema.table",
}

test_cases = [
("test_schema.#temp", True), # Starts with #
("test_schema._tmp", True), # Starts with _tmp
("test_schema.regular_table", False), # In discovered_datasets
("test_schema.table", False), # In discovered_datasets
("test_schema.temp_123", True), # Not in discovered_datasets
("other_schema.temp", False), # Schema not allowed
]

for table_name, expected in test_cases:
actual = source.is_temp_table(table_name)
assert actual == expected, (
f"Failed for {table_name}. Expected {expected}, got {actual}"
)
83 changes: 83 additions & 0 deletions metadata-ingestion/tests/unit/test_mariadb_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,86 @@ def test_mariadb_config():
assert not config.procedure_pattern.allowed("test_db.my_proc_temp")
assert not config.procedure_pattern.allowed("other_db.proc")
assert config.host_port == "localhost:3306"


def test_mariadb_error_handling():
"""Test error handling in MariaDB stored procedure fetching"""
mock_conn = MagicMock(spec=Connection)

# Create mock result for ROUTINES query
routines_result = MagicMock()
routines_result.__iter__.return_value = [
{
"ROUTINE_SCHEMA": "test_db",
"ROUTINE_NAME": "test_proc",
"ROUTINE_DEFINITION": "CREATE PROCEDURE test_proc() BEGIN SELECT 1; END",
"ROUTINE_COMMENT": "Test procedure",
"CREATED": "2024-01-01",
"LAST_ALTERED": "2024-01-02",
"SQL_DATA_ACCESS": "MODIFIES",
"SECURITY_TYPE": "DEFINER",
"DEFINER": "root@localhost",
}
].__iter__()

# Mock execution behavior
def mock_execute(query):
if "SHOW CREATE PROCEDURE" in str(query):
raise Exception("Failed to get procedure")
if "FROM information_schema.ROUTINES" in str(query):
return routines_result
return MagicMock()

mock_conn.execute.side_effect = mock_execute

source = MariaDBSource(ctx=PipelineContext(run_id="test"), config=MariaDBConfig())
procedures = source._get_stored_procedures(
conn=mock_conn, db_name="test_db", schema="test_db"
)

# Verify the results
assert len(procedures) == 1
assert procedures[0]["routine_schema"] == "test_db"
assert procedures[0]["routine_name"] == "test_proc"
# Should fall back to ROUTINE_DEFINITION when SHOW CREATE PROCEDURE fails
assert procedures[0]["code"] == "CREATE PROCEDURE test_proc() BEGIN SELECT 1; END"
assert "code" in procedures[0]


def test_mariadb_procedure_pattern_filtering():
"""Test procedure pattern filtering in MariaDB source"""
mock_inspector = MagicMock(spec=Inspector)
mock_engine = MagicMock()
mock_conn = MagicMock(spec=Connection)
mock_cm = MagicMock()
mock_cm.__enter__.return_value = mock_conn
mock_engine.connect.return_value = mock_cm
mock_inspector.engine = mock_engine
mock_inspector.engine.url.database = "test_db"

config = MariaDBConfig(
host_port="localhost:3306",
include_stored_procedures=True,
procedure_pattern=AllowDenyPattern(allow=["test_db.*"], deny=[".*_temp"]),
)

source = MariaDBSource(ctx=PipelineContext(run_id="test"), config=config)

with patch.object(source, "_get_stored_procedures") as mock_get_procs:
mock_get_procs.return_value = [
{
"routine_schema": "test_db",
"routine_name": "test_proc_temp",
"code": "CREATE PROCEDURE test_proc_temp() BEGIN SELECT 1; END",
}
]

# Convert generator to list to execute it
workunits = list(
source.loop_stored_procedures(
inspector=mock_inspector, schema="test_db", sql_config=config
)
)

# Should be filtered out by pattern
assert len(workunits) == 0

0 comments on commit 477a5fa

Please sign in to comment.