mzcompose.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """Tests the dynamic tracing setup on environmentd"""
  10. import os
  11. import time
  12. import requests
  13. from materialize.mzcompose.composition import Composition
  14. from materialize.mzcompose.services.clusterd import Clusterd
  15. from materialize.mzcompose.services.materialized import Materialized
  16. from materialize.mzcompose.services.mz import Mz
  17. SENTRY_DSN = os.getenv("BUILDKITE_SENTRY_DSN")
  18. SERVICES = [
  19. Mz(app_password=""),
  20. Materialized(
  21. options=[
  22. "--opentelemetry-endpoint=whatever:7777",
  23. f"--sentry-dsn={SENTRY_DSN}",
  24. "--sentry-environment=development",
  25. ]
  26. ),
  27. Clusterd(name="clusterd"),
  28. ]
  29. def workflow_default(c: Composition) -> None:
  30. def process(name: str) -> None:
  31. if name == "default":
  32. return
  33. with c.test_case(name):
  34. c.workflow(name)
  35. c.test_parts(list(c.workflows.keys()), process)
  36. def workflow_with_everything(c: Composition) -> None:
  37. c.up("materialized")
  38. port = c.port("materialized", 6878)
  39. # Start with fastpath
  40. info = requests.get(f"http://localhost:{port}/api/tracing").json()
  41. assert info["current_level_filter"] == "info"
  42. # update the stderr config
  43. c.sql(
  44. "ALTER SYSTEM SET log_filter = 'foo=debug,info'",
  45. user="mz_system",
  46. port=6877,
  47. print_statement=False,
  48. )
  49. info = requests.get(f"http://localhost:{port}/api/tracing").json()
  50. assert info["current_level_filter"] == "debug"
  51. # update the otel config
  52. c.sql(
  53. "ALTER SYSTEM SET opentelemetry_filter = 'foo=trace,info'",
  54. user="mz_system",
  55. port=6877,
  56. print_statement=False,
  57. )
  58. info = requests.get(f"http://localhost:{port}/api/tracing").json()
  59. assert info["current_level_filter"] == "trace"
  60. # revert the otel config and make sure we go back
  61. c.sql(
  62. "ALTER SYSTEM SET opentelemetry_filter = 'off'",
  63. user="mz_system",
  64. port=6877,
  65. print_statement=False,
  66. )
  67. info = requests.get(f"http://localhost:{port}/api/tracing").json()
  68. assert info["current_level_filter"] == "debug"
  69. # update the sentry directives
  70. c.sql(
  71. "ALTER SYSTEM SET sentry_filters = 'foo=trace'",
  72. user="mz_system",
  73. port=6877,
  74. print_statement=False,
  75. )
  76. info = requests.get(f"http://localhost:{port}/api/tracing").json()
  77. assert info["current_level_filter"] == "trace"
  78. # revert the sentry directives and make sure we go back
  79. c.sql(
  80. "ALTER SYSTEM RESET sentry_filters",
  81. user="mz_system",
  82. port=6877,
  83. print_statement=False,
  84. )
  85. info = requests.get(f"http://localhost:{port}/api/tracing").json()
  86. assert info["current_level_filter"] == "debug"
  87. # make sure we can go allll the way back
  88. c.sql(
  89. "ALTER SYSTEM SET log_filter = 'info'",
  90. user="mz_system",
  91. port=6877,
  92. print_statement=False,
  93. )
  94. info = requests.get(f"http://localhost:{port}/api/tracing").json()
  95. assert info["current_level_filter"] == "info"
  96. def workflow_basic(c: Composition) -> None:
  97. with c.override(Materialized()):
  98. c.up("materialized")
  99. port = c.port("materialized", 6878)
  100. # Start with fastpath
  101. info = requests.get(f"http://localhost:{port}/api/tracing").json()
  102. assert info["current_level_filter"] == "info"
  103. # update the stderr config
  104. c.sql(
  105. "ALTER SYSTEM SET log_filter = 'foo=debug,info'",
  106. user="mz_system",
  107. port=6877,
  108. print_statement=False,
  109. )
  110. info = requests.get(f"http://localhost:{port}/api/tracing").json()
  111. assert info["current_level_filter"] == "debug"
  112. # make sure we can go back to normal
  113. c.sql(
  114. "ALTER SYSTEM SET log_filter = 'info'",
  115. user="mz_system",
  116. port=6877,
  117. print_statement=False,
  118. )
  119. info = requests.get(f"http://localhost:{port}/api/tracing").json()
  120. assert info["current_level_filter"] == "info"
  121. # Assert `EXPLAIN` doesn't break things in steady-state.
  122. c.sql(
  123. "EXPLAIN SELECT 1",
  124. print_statement=False,
  125. )
  126. info = requests.get(f"http://localhost:{port}/api/tracing").json()
  127. assert info["current_level_filter"] == "info"
  128. def workflow_clusterd(c: Composition) -> None:
  129. c.up("materialized", "clusterd")
  130. port = c.port("clusterd", 6878)
  131. c.sql(
  132. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  133. port=6877,
  134. user="mz_system",
  135. )
  136. c.sql(
  137. """
  138. CREATE CLUSTER c REPLICAS (r1 (
  139. STORAGECTL ADDRESSES ['clusterd:2100'],
  140. STORAGE ADDRESSES ['clusterd:2103'],
  141. COMPUTECTL ADDRESSES ['clusterd:2101'],
  142. COMPUTE ADDRESSES ['clusterd:2102'],
  143. WORKERS 1
  144. ))
  145. """
  146. )
  147. c.sql(
  148. "ALTER SYSTEM SET log_filter = 'foo=debug,info'",
  149. user="mz_system",
  150. port=6877,
  151. print_statement=False,
  152. )
  153. start = time.time()
  154. timeout = 10
  155. is_debug = False
  156. # the updated configuration is sent to clusterd asynchronously,
  157. # spin here until the new tracing level is observed.
  158. while time.time() - start < timeout:
  159. info = requests.get(f"http://localhost:{port}/api/tracing").json()
  160. if info["current_level_filter"] == "debug":
  161. is_debug = True
  162. break
  163. assert is_debug
  164. # Reset
  165. c.sql(
  166. "ALTER SYSTEM SET log_filter = 'info'",
  167. user="mz_system",
  168. port=6877,
  169. print_statement=False,
  170. )
  171. port = c.port("materialized", 6878)
  172. info = requests.get(f"http://localhost:{port}/api/tracing").json()
  173. assert info["current_level_filter"] == "info"