123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """Tests the dynamic tracing setup on environmentd"""
- import os
- import time
- import requests
- from materialize.mzcompose.composition import Composition
- from materialize.mzcompose.services.clusterd import Clusterd
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.mz import Mz
- SENTRY_DSN = os.getenv("BUILDKITE_SENTRY_DSN")
- SERVICES = [
- Mz(app_password=""),
- Materialized(
- options=[
- "--opentelemetry-endpoint=whatever:7777",
- f"--sentry-dsn={SENTRY_DSN}",
- "--sentry-environment=development",
- ]
- ),
- Clusterd(name="clusterd"),
- ]
- def workflow_default(c: Composition) -> None:
- def process(name: str) -> None:
- if name == "default":
- return
- with c.test_case(name):
- c.workflow(name)
- c.test_parts(list(c.workflows.keys()), process)
- def workflow_with_everything(c: Composition) -> None:
- c.up("materialized")
- port = c.port("materialized", 6878)
- # Start with fastpath
- info = requests.get(f"http://localhost:{port}/api/tracing").json()
- assert info["current_level_filter"] == "info"
- # update the stderr config
- c.sql(
- "ALTER SYSTEM SET log_filter = 'foo=debug,info'",
- user="mz_system",
- port=6877,
- print_statement=False,
- )
- info = requests.get(f"http://localhost:{port}/api/tracing").json()
- assert info["current_level_filter"] == "debug"
- # update the otel config
- c.sql(
- "ALTER SYSTEM SET opentelemetry_filter = 'foo=trace,info'",
- user="mz_system",
- port=6877,
- print_statement=False,
- )
- info = requests.get(f"http://localhost:{port}/api/tracing").json()
- assert info["current_level_filter"] == "trace"
- # revert the otel config and make sure we go back
- c.sql(
- "ALTER SYSTEM SET opentelemetry_filter = 'off'",
- user="mz_system",
- port=6877,
- print_statement=False,
- )
- info = requests.get(f"http://localhost:{port}/api/tracing").json()
- assert info["current_level_filter"] == "debug"
- # update the sentry directives
- c.sql(
- "ALTER SYSTEM SET sentry_filters = 'foo=trace'",
- user="mz_system",
- port=6877,
- print_statement=False,
- )
- info = requests.get(f"http://localhost:{port}/api/tracing").json()
- assert info["current_level_filter"] == "trace"
- # revert the sentry directives and make sure we go back
- c.sql(
- "ALTER SYSTEM RESET sentry_filters",
- user="mz_system",
- port=6877,
- print_statement=False,
- )
- info = requests.get(f"http://localhost:{port}/api/tracing").json()
- assert info["current_level_filter"] == "debug"
- # make sure we can go allll the way back
- c.sql(
- "ALTER SYSTEM SET log_filter = 'info'",
- user="mz_system",
- port=6877,
- print_statement=False,
- )
- info = requests.get(f"http://localhost:{port}/api/tracing").json()
- assert info["current_level_filter"] == "info"
- def workflow_basic(c: Composition) -> None:
- with c.override(Materialized()):
- c.up("materialized")
- port = c.port("materialized", 6878)
- # Start with fastpath
- info = requests.get(f"http://localhost:{port}/api/tracing").json()
- assert info["current_level_filter"] == "info"
- # update the stderr config
- c.sql(
- "ALTER SYSTEM SET log_filter = 'foo=debug,info'",
- user="mz_system",
- port=6877,
- print_statement=False,
- )
- info = requests.get(f"http://localhost:{port}/api/tracing").json()
- assert info["current_level_filter"] == "debug"
- # make sure we can go back to normal
- c.sql(
- "ALTER SYSTEM SET log_filter = 'info'",
- user="mz_system",
- port=6877,
- print_statement=False,
- )
- info = requests.get(f"http://localhost:{port}/api/tracing").json()
- assert info["current_level_filter"] == "info"
- # Assert `EXPLAIN` doesn't break things in steady-state.
- c.sql(
- "EXPLAIN SELECT 1",
- print_statement=False,
- )
- info = requests.get(f"http://localhost:{port}/api/tracing").json()
- assert info["current_level_filter"] == "info"
- def workflow_clusterd(c: Composition) -> None:
- c.up("materialized", "clusterd")
- port = c.port("clusterd", 6878)
- c.sql(
- "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
- port=6877,
- user="mz_system",
- )
- c.sql(
- """
- CREATE CLUSTER c REPLICAS (r1 (
- STORAGECTL ADDRESSES ['clusterd:2100'],
- STORAGE ADDRESSES ['clusterd:2103'],
- COMPUTECTL ADDRESSES ['clusterd:2101'],
- COMPUTE ADDRESSES ['clusterd:2102'],
- WORKERS 1
- ))
- """
- )
- c.sql(
- "ALTER SYSTEM SET log_filter = 'foo=debug,info'",
- user="mz_system",
- port=6877,
- print_statement=False,
- )
- start = time.time()
- timeout = 10
- is_debug = False
- # the updated configuration is sent to clusterd asynchronously,
- # spin here until the new tracing level is observed.
- while time.time() - start < timeout:
- info = requests.get(f"http://localhost:{port}/api/tracing").json()
- if info["current_level_filter"] == "debug":
- is_debug = True
- break
- assert is_debug
- # Reset
- c.sql(
- "ALTER SYSTEM SET log_filter = 'info'",
- user="mz_system",
- port=6877,
- print_statement=False,
- )
- port = c.port("materialized", 6878)
- info = requests.get(f"http://localhost:{port}/api/tracing").json()
- assert info["current_level_filter"] == "info"
|