Skip to content

Commit

Permalink
Merge pull request #30820 from def-/pr-deterministic-upsert-source
Browse files Browse the repository at this point in the history
Reproduce 0dt upsert source panic in more deterministic workflow
  • Loading branch information
def- authored Jan 12, 2025
2 parents 4551e00 + 86f377c commit 7790629
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 1 deletion.
2 changes: 1 addition & 1 deletion ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1696,7 +1696,7 @@ steps:
- ./ci/plugins/mzcompose:
composition: 0dt
agents:
queue: hetzner-aarch64-8cpu-16gb
queue: hetzner-aarch64-16cpu-32gb

- id: emulator
label: Materialize Emulator
Expand Down
112 changes: 112 additions & 0 deletions test/0dt/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
"""

import time
from datetime import datetime, timedelta
from textwrap import dedent
from threading import Thread

from psycopg.errors import OperationalError

Expand Down Expand Up @@ -1388,3 +1390,113 @@ def get_correction_metrics():
raise AssertionError(
f"unexpected correction metrics: {insertions=}, {deletions=}"
)


def workflow_upsert_sources(c: Composition) -> None:
c.down(destroy_volumes=True)
c.up("zookeeper", "kafka", "schema-registry", "postgres", "mysql", "mz_old")
c.up("testdrive", persistent=True)
num_threads = 500

c.sql(
f"""
DROP CLUSTER IF EXISTS cluster CASCADE;
CREATE CLUSTER cluster SIZE '2-1';
GRANT ALL ON CLUSTER cluster TO materialize;
ALTER SYSTEM SET cluster = cluster;
ALTER SYSTEM SET max_sources = {num_threads * 2};
ALTER SYSTEM SET max_materialized_views = {num_threads * 2};
""",
service="mz_old",
port=6877,
user="mz_system",
)

c.testdrive(
dedent(
"""
> SET CLUSTER = cluster;
> CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL = 'PLAINTEXT';
> CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}';
"""
)
)

end_time = datetime.now() + timedelta(seconds=600)
mz1 = "mz_old"
mz2 = "mz_new"

def worker(i: int) -> None:
c.testdrive(
dedent(
f"""
$ kafka-create-topic topic=kafka{i}
> CREATE SOURCE kafka_source{i}
IN CLUSTER cluster
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka{i}-${{testdrive.seed}}');
> CREATE TABLE kafka_source_tbl{i} (key1, key2, value1, value2)
FROM SOURCE kafka_source{i} (REFERENCE "testdrive-kafka{i}-${{testdrive.seed}}")
KEY FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
VALUE FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
ENVELOPE UPSERT;
> CREATE DEFAULT INDEX ON kafka_source_tbl{i}
> CREATE MATERIALIZED VIEW mv{i} AS SELECT * FROM kafka_source_tbl{i}
"""
)
)

while datetime.now() < end_time:
try:
c.testdrive(
dedent(
f"""
$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka{i} repeat=10000
key1A,key1B:value1A,value1B
"""
)
)
except:
pass

threads = []
for i in range(num_threads):
thread = Thread(name=f"worker_{i}", target=worker, args=(i,))
threads.append(thread)

for thread in threads:
thread.start()

i = 1
while datetime.now() < end_time:
with c.override(
Materialized(
name=mz2,
sanity_restart=False,
deploy_generation=i,
system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS,
restart="on-failure",
external_metadata_store=True,
),
Testdrive(
materialize_url=f"postgres://materialize@{mz1}:6875",
materialize_url_internal=f"postgres://materialize@{mz1}:6877",
mz_service=mz1,
materialize_params={"cluster": "cluster"},
no_consistency_checks=True,
no_reset=True,
seed=1,
default_timeout=DEFAULT_TIMEOUT,
),
):
c.up(mz2)
c.await_mz_deployment_status(DeploymentStatus.READY_TO_PROMOTE, mz2)
c.promote_mz(mz2)
c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, mz2)

i += 1
mz1, mz2 = mz2, mz1

for thread in threads:
thread.join()

0 comments on commit 7790629

Please sign in to comment.