Skip to content

Commit

Permalink
ci: Remember failed workflows/td files
Browse files Browse the repository at this point in the history
to be able to run them first in the next run on a PR
  • Loading branch information
def- committed Feb 4, 2025
1 parent cd70347 commit bb8421b
Show file tree
Hide file tree
Showing 33 changed files with 397 additions and 151 deletions.
1 change: 0 additions & 1 deletion misc/python/materialize/ci_util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def upload_junit_report(suite: str, junit_report: Path) -> None:
"""
if not buildkite.is_in_buildkite():
return
ui.section(f"Uploading report for suite {suite!r} to Buildkite Test Analytics")
suite = suite.upper().replace("-", "_")
token = os.getenv(f"BUILDKITE_TEST_ANALYTICS_API_KEY_{suite}")
if not token:
Expand Down
46 changes: 44 additions & 2 deletions misc/python/materialize/mzcompose/composition.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@
from contextlib import contextmanager
from inspect import Traceback, getframeinfo, getmembers, isfunction, stack
from tempfile import TemporaryFile
from typing import Any, TextIO, cast
from typing import Any, TextIO, TypeVar, cast

import psycopg
import sqlparse
import yaml
from psycopg import Connection, Cursor

from materialize import MZ_ROOT, mzbuild, spawn, ui
from materialize import MZ_ROOT, buildkite, mzbuild, spawn, ui
from materialize.mzcompose import cluster_replica_size_map, loader
from materialize.mzcompose.service import Service
from materialize.mzcompose.services.materialized import (
Expand Down Expand Up @@ -1564,3 +1564,45 @@ def cloud_hostname(self, quiet: bool = False) -> str:
# It is necessary to append the 'https://' protocol; otherwise, urllib can't parse it correctly.
cloud_hostname = urllib.parse.urlparse("https://" + cloud_url).hostname
return str(cloud_hostname)

T = TypeVar("T")

def test_parts(self, parts: list[T], process_func: Callable[[T], None]) -> None:
from materialize.test_analytics.config.test_analytics_db_config import (
create_test_analytics_config,
)
from materialize.test_analytics.test_analytics_db import TestAnalyticsDb

priority: dict[str, int] = {}
test_analytics: TestAnalyticsDb | None = None

if buildkite.is_in_buildkite():
print("~~~ Fetching part priorities")
test_analytics_config = create_test_analytics_config(self)
test_analytics = TestAnalyticsDb(test_analytics_config)
try:
priority = test_analytics.builds.get_part_priorities(timeout=15)
print(f"Priorities: {priority}")
except Exception as e:
print(f"Failed to fetch part priorities, using default order: {e}")

sorted_parts = sorted(
parts, key=lambda part: priority.get(str(part), 0), reverse=True
)
try:
for part in sorted_parts:
try:
process_func(part)
except Exception:
if buildkite.is_in_buildkite():
assert test_analytics
test_analytics.builds.add_build_job_failure(str(part))
# TODO: Should we raise immediately?
raise
# exception = e
finally:
if buildkite.is_in_buildkite():
assert test_analytics
test_analytics.database_connector.submit_update_statements()
# if exception:
# raise exception
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,56 @@ def update_build_job_success(
)

self.database_connector.add_update_statements(sql_statements)

def add_build_job_failure(
self,
part: str,
) -> None:
job_id = buildkite.get_var(BuildkiteEnvVar.BUILDKITE_JOB_ID)

sql_statements = []
sql_statements.append(
f"""
INSERT INTO build_job_failure
(
build_job_id,
part
)
VALUES
(
{as_sanitized_literal(job_id)},
{as_sanitized_literal(part)}
)
"""
)

self.database_connector.add_update_statements(sql_statements)

def get_part_priorities(self, timeout: int) -> dict[str, int]:
branch = buildkite.get_var(BuildkiteEnvVar.BUILDKITE_BRANCH)
build_step_key = buildkite.get_var(BuildkiteEnvVar.BUILDKITE_STEP_KEY)
with self.database_connector.create_cursor() as cur:
cur.execute('SET cluster = "test_analytics"')
cur.execute(f"SET statement_timeout = '{timeout}s'".encode("utf-8"))
# 2 for failures in this PR
# 1 for failed recently in CI
cur.execute(
f"""
SELECT part, MAX(prio)
FROM (
SELECT part, 2 AS prio
FROM mv_build_job_failed_on_branch
WHERE branch = {as_sanitized_literal(branch)}
AND build_step_key = {as_sanitized_literal(build_step_key)}
UNION
SELECT part, 1 AS prio
FROM mv_build_job_failed
WHERE build_step_key = {as_sanitized_literal(build_step_key)}
)
GROUP BY part;
""".encode(
"utf-8"
)
)
results = cur.fetchall()
return {part: prio for part, prio in results}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- Copyright Materialize, Inc. and contributors. All rights reserved.
--
-- Use of this software is governed by the Business Source License
-- included in the LICENSE file at the root of this repository.
--
-- As of the Change Date specified in that file, in accordance with
-- the Business Source License, use of this software will be governed
-- by the Apache License, Version 2.0.

-- part of a build job that failed, can be a testdrive file or a workflow
CREATE TABLE build_job_failure (
build_job_id TEXT NOT NULL,
part TEXT NOT NULL
);

ALTER TABLE build_job_failure OWNER TO qa;
GRANT SELECT, INSERT, UPDATE ON TABLE build_job_failure TO "hetzner-ci";
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- Copyright Materialize, Inc. and contributors. All rights reserved.
--
-- Use of this software is governed by the Business Source License
-- included in the LICENSE file at the root of this repository.
--
-- As of the Change Date specified in that file, in accordance with
-- the Business Source License, use of this software will be governed
-- by the Apache License, Version 2.0.

CREATE OR REPLACE MATERIALIZED VIEW mv_build_job_failed_on_branch AS
SELECT build_step_key, branch, part
FROM build
JOIN build_job ON build.build_id = build_job.build_id
JOIN build_job_failure ON build_job.build_job_id = build_job_failure.build_job_id;

CREATE OR REPLACE MATERIALIZED VIEW mv_build_job_failed AS
SELECT build_step_key, part
FROM build_job
JOIN build_job_failure ON build_job.build_job_id = build_job_failure.build_job_id;

CREATE DEFAULT INDEX ON mv_build_job_failed_on_branch;
CREATE DEFAULT INDEX ON mv_build_job_failed;

ALTER MATERIALIZED VIEW mv_build_job_failed_on_branch OWNER TO qa;
GRANT SELECT ON TABLE mv_build_job_failed_on_branch TO "hetzner-ci";
ALTER MATERIALIZED VIEW mv_build_job_failed OWNER TO qa;
GRANT SELECT ON TABLE mv_build_job_failed TO "hetzner-ci";
13 changes: 9 additions & 4 deletions test/0dt/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
Materialized,
)
from materialize.mzcompose.services.mysql import MySql
from materialize.mzcompose.services.mz import Mz
from materialize.mzcompose.services.postgres import (
CockroachOrPostgresMetadata,
Postgres,
Expand All @@ -49,6 +50,7 @@
Kafka(),
SchemaRegistry(),
CockroachOrPostgresMetadata(),
Mz(app_password=""),
Materialized(
name="mz_old",
sanity_restart=False,
Expand Down Expand Up @@ -77,14 +79,17 @@


def workflow_default(c: Composition) -> None:
for name in buildkite.shard_list(
list(c.workflows.keys()), lambda workflow: workflow
):
def process(name: str) -> None:
if name == "default":
continue
return
with c.test_case(name):
c.workflow(name)

workflows = buildkite.shard_list(
list(c.workflows.keys()), lambda workflow: workflow
)
c.test_parts(workflows, process)


def workflow_read_only(c: Composition) -> None:
"""Verify read-only mode."""
Expand Down
7 changes: 6 additions & 1 deletion test/aws-localstack/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
)
from materialize.mzcompose.services.localstack import Localstack
from materialize.mzcompose.services.materialized import Materialized
from materialize.mzcompose.services.mz import Mz
from materialize.mzcompose.services.testdrive import Testdrive

ENVIRONMENT_NAME = f"environment-{DEFAULT_ORG_ID}-{DEFAULT_ORDINAL}"
Expand All @@ -43,6 +44,7 @@

SERVICES = [
Localstack(),
Mz(app_password=""),
Materialized(
depends_on=["localstack"],
environment_extra=[
Expand All @@ -66,10 +68,13 @@


def workflow_default(c: Composition) -> None:
for name in ["secrets-manager", "aws-connection", "copy-to-s3"]:
def process(name: str) -> None:
with c.test_case(name):
c.workflow(name)

workflows = ["secrets-manager", "aws-connection", "copy-to-s3"]
c.test_parts(workflows, process)


def workflow_secrets_manager(c: Composition) -> None:
c.up("localstack")
Expand Down
9 changes: 7 additions & 2 deletions test/balancerd/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from materialize.mzcompose.services.balancerd import Balancerd
from materialize.mzcompose.services.frontegg import FronteggMock
from materialize.mzcompose.services.materialized import Materialized
from materialize.mzcompose.services.mz import Mz
from materialize.mzcompose.services.test_certs import TestCerts
from materialize.mzcompose.services.testdrive import Testdrive

Expand Down Expand Up @@ -118,6 +119,7 @@ def app_password(email: str) -> str:
"secrets:/secrets",
],
),
Mz(app_password=""),
Materialized(
options=[
# Enable TLS on the public port to verify that balancerd is connecting to the balancerd
Expand Down Expand Up @@ -185,11 +187,14 @@ def pg8000_sql_cursor(
def workflow_default(c: Composition) -> None:
c.down(destroy_volumes=True)

for name in c.workflows:
def process(name: str) -> None:
if name in ["default", "plaintext"]:
continue
return
with c.test_case(name):
c.workflow(name)

c.test_parts(list(c.workflows.keys()), process)

with c.test_case("plaintext"):
c.workflow("plaintext")

Expand Down
12 changes: 5 additions & 7 deletions test/bounded-memory/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -1091,16 +1091,14 @@ class KafkaScenario(Scenario):


def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
for name in c.workflows:
if name == "default":
continue

if name == "minimization-search":
continue

def process(name: str) -> None:
if name in ["default", "minimization-search"]:
return
with c.test_case(name):
c.workflow(name)

c.test_parts(list(c.workflows.keys()), process)


def workflow_main(c: Composition, parser: WorkflowArgumentParser) -> None:
"""Process various datasets in a memory-constrained environment in order
Expand Down
9 changes: 6 additions & 3 deletions test/chbench/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from materialize.mzcompose.services.materialized import Materialized
from materialize.mzcompose.services.metabase import Metabase
from materialize.mzcompose.services.mysql import MySql
from materialize.mzcompose.services.mz import Mz
from materialize.mzcompose.services.schema_registry import SchemaRegistry
from materialize.mzcompose.services.zookeeper import Zookeeper

Expand All @@ -28,6 +29,7 @@
SchemaRegistry(),
Debezium(),
MySql(root_password="rootpw"),
Mz(app_password=""),
Materialized(),
Metabase(),
Service(
Expand All @@ -42,13 +44,14 @@


def workflow_default(c: Composition) -> None:
for name in c.workflows:
def process(name: str) -> None:
if name == "default":
continue

return
with c.test_case(name):
c.workflow(name)

c.test_parts(list(c.workflows.keys()), process)


def workflow_no_load(c: Composition, parser: WorkflowArgumentParser) -> None:
"""Run CH-benCHmark without any load on Materialize"""
Expand Down
11 changes: 7 additions & 4 deletions test/cluster/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from materialize.mzcompose.services.localstack import Localstack
from materialize.mzcompose.services.materialized import Materialized
from materialize.mzcompose.services.minio import Minio
from materialize.mzcompose.services.mz import Mz
from materialize.mzcompose.services.postgres import (
CockroachOrPostgresMetadata,
Postgres,
Expand All @@ -60,6 +61,7 @@
Clusterd(name="clusterd2"),
Clusterd(name="clusterd3"),
Clusterd(name="clusterd4"),
Mz(app_password=""),
Materialized(
# We use mz_panic() in some test scenarios, so environmentd must stay up.
propagate_crashes=False,
Expand All @@ -82,20 +84,21 @@


def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
for name in buildkite.shard_list(
list(c.workflows.keys()), lambda workflow: workflow
):
def process(name: str) -> None:
# incident-70 requires more memory, runs in separate CI step
# concurrent-connections is too flaky
if name in (
"default",
"test-incident-70",
"test-concurrent-connections",
):
continue
return
with c.test_case(name):
c.workflow(name)

files = buildkite.shard_list(list(c.workflows.keys()), lambda workflow: workflow)
c.test_parts(files, process)


def workflow_test_smoke(c: Composition, parser: WorkflowArgumentParser) -> None:
"""Run testdrive in a variety of cluster configurations."""
Expand Down
Loading

0 comments on commit bb8421b

Please sign in to comment.